AioAws

AioAWS Settings

Using asyncio for AWS services requires the aiobotocore library, which wraps a release of botocore to patch it with features for async coroutines using asyncio and aiohttp. To avoid issuing too many concurrent requests (DOS attack), the async approach should use a client connection limiter, based on asyncio.Semaphore(). It’s recommended to use a single session and a single client with a connection pool. Although there are context manager patterns, it’s also possible to manage closing the client after everything is done.

class aio_aws.aio_aws_config.AioAWSConfig(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 10, sem: int = 10, session: aiobotocore.session.AioSession = None)[source]

Bases: object

aws_region: str = None

an optional AWS region name

create_client(service: str, *args, **kwargs)aiobotocore.client.AioBaseClient[source]

Create and yield a new client using the AioAWSConfig.session; the clients are configured with a default limit on the size of the connection pool defined by AioAWSConfig.max_pool_connections

config = AioAWSConfig()
async with config.create_client("s3") as client:
    response = await s3_client.head_bucket(Bucket=bucket_name)

It is possible to pass through additional args and kwargs including config=botocore.client.Config.

Yield

an aiobotocore.client.AioBaseClient for AWS service

property default_client_config
classmethod get_default_config()[source]
max_jitter: float = 10

an asyncio.sleep for random.uniform(min_jitter, max_jitter)

max_pause: float = 30

an asyncio.sleep for random.uniform(min_pause, max_pause)

max_pool_connections: int = 10

defines a limit to the number of client connections

min_jitter: float = 1

an asyncio.sleep for random.uniform(min_jitter, max_jitter)

min_pause: float = 5

an asyncio.sleep for random.uniform(min_pause, max_pause)

retries: int = 4

a number of retries for an AWS client request/response

sem: int = 10
property semaphore

An asyncio.Semaphore to limit the number of concurrent clients; this defaults to the session client connection pool

session: aiobotocore.session.AioSession = None

an aiobotocore.session.AioSession

aio_aws.aio_aws_config.MAX_JITTER: float = 10

Maximum API request jitter

aio_aws.aio_aws_config.MAX_PAUSE: float = 30

Maximum task pause

aio_aws.aio_aws_config.MAX_POOL_CONNECTIONS = 10

max_pool_connections for AWS clients (10 by default)

aio_aws.aio_aws_config.MIN_JITTER: float = 1

Minimum API request jitter

aio_aws.aio_aws_config.MIN_PAUSE: float = 5

Minimum task pause

aio_aws.aio_aws_config.RETRY_EXCEPTIONS = ['TooManyRequestsException', 'ThrottlingException']

Common exception codes that are retried

aio_aws.aio_aws_config.aio_aws_client(service_name: str, *args, **kwargs)aiobotocore.client.AioBaseClient[source]

Yield an asyncio AWS client with an option to provide a client-specific config; this is a thin wrapper on aiobotocore.get_session().create_client() and the additional kwargs as passed through to session.create_client(**kwargs).

It is possible to pass through additional args and kwargs including config: aiobotocore.config.AioConfig (the default is aio_aws_default_config())

s3_endpoint = "http://localhost:5555"
client_config = botocore.client.Config(
    read_timeout=120,
    max_pool_connections=50,
)
async with aio_aws_client(
    "s3", endpoint_url=s3_endpoint, config=client_config
) as client:
    assert "aiobotocore.client.S3" in str(client)
    assert isinstance(client, aiobotocore.client.AioBaseClient)
Parameters

service_name – an AWS service for a client, like “s3”, try session.get_available_services()

Yield

aiobotocore.client.AioBaseClient

aio_aws.aio_aws_config.aio_aws_default_config()aiobotocore.config.AioConfig[source]

Get a default asyncio AWS config using a default py:const:MAX_POOL_CONNECTIONS

Returns

aiobotocore.config.AioConfig

aio_aws.aio_aws_config.aio_aws_default_session()aiobotocore.session.AioSession[source]

Get a default asyncio AWS session with a default config from aio_aws_default_config()

Returns

aiobotocore.session.AioSession

aio_aws.aio_aws_config.aio_aws_session(aio_aws_config: Optional[aiobotocore.config.AioConfig] = None)aiobotocore.session.AioSession[source]

Get an asyncio AWS session with an ‘aio-aws’ user agent name

Parameters

aio_aws_config – an aiobotocore.config.AioConfig (default aio_aws_default_config())

Returns

aiobotocore.session.AioSession

aio_aws.aio_aws_config.asyncio_default_semaphore()asyncio.locks.Semaphore[source]

a default semaphore to limit creation of clients; it defaults to 2 * py:const:MAX_POOL_CONNECTIONS

Returns

aiobotocore.config.AioConfig

async aio_aws.aio_aws_config.delay(task_id: str, min_pause: float = 5, max_pause: float = 30)float[source]

Await a random pause between MIN_PAUSE and MAX_PAUSE

Parameters
  • task_id – the ID for the task awaiting this pause

  • min_pause – defaults to MIN_PAUSE

  • max_pause – defaults to MAX_PAUSE

Returns

random interval for pause

async aio_aws.aio_aws_config.jitter(task_id: str = 'jitter', min_jitter: float = 1, max_jitter: float = 10)float[source]

Await a random pause between min_jitter and max_jitter

Parameters
  • task_id – an optional ID for the task awaiting this jitter

  • min_jitter – defaults to MIN_JITTER

  • max_jitter – defaults to MAX_JITTER

Returns

random interval for pause

AioAWS Batch

In testing this, it’s able to run and monitor 100s of jobs from a laptop with modest CPU/RAM resources. It can recover from a crash by using a db-state, without re-running the same jobs (by jobName). It should be able to scale to run 1000s of jobs.

To run the example, the aio_aws.aio_aws_batch module has a main that will run and manage about 5 live batch jobs (very small sleep jobs that don’t cost much to run). The job state is persisted to aws_batch_jobs.json and if it runs successfully, it will not run the jobs again; the TinyDB is used to recover job state by jobName. (This demo assumes that some simple AWS Batch infrastructure exists already.)

# setup the python virtualenv
# check the main details and modify for a preferred batch queue/CE and AWS region

$ ./aio_aws/aio_aws_batch.py

Test async batch jobs

# wait a few minutes and watch the status messages
# it submits and monitors the jobs until they complete
# job status is saved and updated in `aws_batch_jobs.json`
# when it's done, run it again and see that nothing is re-submitted

$ ./aio_aws/aio_aws_batch.py

If the job monitoring is halted for some reason (like CNT-C), it can recover from the db-state, e.g.

$ ./aio_aws/aio_aws_batch.py

Test async batch jobs
[INFO]  2020-03-05T14:51:53.372Z  aio-aws:<module>:485  AWS Batch job (test-sleep-job-0000) recovered from db
[INFO]  2020-03-05T14:51:53.373Z  aio-aws:<module>:485  AWS Batch job (test-sleep-job-0001) recovered from db
[INFO]  2020-03-05T14:51:53.373Z  aio-aws:<module>:485  AWS Batch job (test-sleep-job-0002) recovered from db
[INFO]  2020-03-05T14:51:53.374Z  aio-aws:<module>:485  AWS Batch job (test-sleep-job-0003) recovered from db
[INFO]  2020-03-05T14:51:53.374Z  aio-aws:<module>:485  AWS Batch job (test-sleep-job-0004) recovered from db
[INFO]  2020-03-05T14:51:53.690Z  aio-aws:aio_batch_job_waiter:375  AWS Batch job (846d54d4-c3c3-4a3b-9101-646d78d3bbfb) status: RUNNABLE
[INFO]  2020-03-05T14:51:53.692Z  aio-aws:aio_batch_job_waiter:375  AWS Batch job (dfce3461-9eab-4f5b-846c-6f223d593f6f) status: RUNNABLE
[INFO]  2020-03-05T14:51:53.693Z  aio-aws:aio_batch_job_waiter:375  AWS Batch job (637e6b27-8d4d-4f45-b988-c00775461616) status: RUNNABLE
[INFO]  2020-03-05T14:51:53.701Z  aio-aws:aio_batch_job_waiter:375  AWS Batch job (d9ac27c9-e7d3-49cd-8f53-c84a9b4c1750) status: RUNNABLE
[INFO]  2020-03-05T14:51:53.732Z  aio-aws:aio_batch_job_waiter:375  AWS Batch job (7ebfe7c4-44a4-40d6-9eab-3708e334689d) status: RUNNABLE

For the demo to run quickly, most of the module settings are fit for fast jobs. For much longer running jobs, there are functions that only submit jobs or check jobs and the settings should be changed for monitoring jobs to only check every 10 or 20 minutes.

Quick Start

from aio_aws.aws_batch_models import AWSBatchJob
from aio_aws.aio_aws_batch import batch_monitor_jobs
from aio_aws.aio_aws_batch import batch_submit_jobs

job1 = AWSBatchJob(
    job_name="sleep-1-job",
    job_definition=your_job_definition_arn,
    job_queue=your_job_queue_arn,
    command=["/bin/sh", "-c", "echo Hello && sleep 1 && echo Bye"],
)
job2 = AWSBatchJob(
    job_name="sleep-2-job",
    job_definition=your_job_definition_arn,
    job_queue=your_job_queue_arn,
    command=["/bin/sh", "-c", "echo Hello && sleep 2 && echo Bye"],
)
batch_submit_jobs(jobs=jobs)
batch_monitor_jobs(jobs=jobs)

This example uses high level synchronous wrappers to make it easy get started. There are a lot of details hidden in these convenient wrappers, but all the code is easily read in aio_aws.aio_aws_batch. For advanced use, please read the source code, especially the unit tests in the code repo. There is very little time to maintain the documentation.

Monitoring Jobs

The aio_aws.aio_aws_batch.aio_batch_job_manager() can submit a job, wait for it to complete and retry if it fails on a SPOT termination. It saves the job status using the aio_aws.aio_aws_batch import AWSBatchDB. The job manager uses aio_aws.aio_aws_batch.aio_batch_job_waiter(), which uses these settings to control the async-wait between polling the job status:

These settings control how often job descriptions are polled. These requests for job status are also limited by the client connection pool and the client semaphore used by the job manager. Since AWS Batch has API limits on the number of requests for job status, it’s best to use a client connection pool and semaphore of about 10 connections. Any failures to poll for a job status will be retried a few times (using some random jitter on retry rates).

To modify the polling frequency settings, use a custom config. For example, the unit test suite uses much faster polling on mock batch jobs to speed up the unit tests; e.g.

config = AWSBatchConfig(
    start_pause=0.4, min_pause=0.8, max_pause=1.0, min_jitter=0.1, max_jitter=0.2,
)
class aio_aws.aio_aws_batch.AWSBatchConfig(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 10, sem: int = 10, session: aiobotocore.session.AioSession = None, start_pause: float = 30, aio_batch_db: Union[aio_aws.aio_aws_batch_db.AioAWSBatchDB, NoneType] = None)[source]

Bases: aio_aws.aio_aws_config.AioAWSConfig

aio_batch_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None

an optional AioAWSBatchDB

create_batch_client()aiobotocore.client.AioBaseClient[source]

Create and yield an AWS Batch client using the AWSBatchConfig.session

Yield

an aiobotocore.client.AioBaseClient for AWS Batch

create_logs_client()aiobotocore.client.AioBaseClient[source]

Create and yield an AWS CloudWatchLogs client using the AWSBatchConfig.session

Yield

an aiobotocore.client.AioBaseClient for AWS CloudWatchLogs

start_pause: float = 30

a batch job startup pause, random.uniform(start_pause, start_pause * 2); this applies when the job status is in [“SUBMITTED”, “PENDING”, “RUNNABLE”]

aio_aws.aio_aws_batch.BATCH_STARTUP_PAUSE: float = 30

batch job startup pause (seconds)

exception aio_aws.aio_aws_batch.RetryError[source]

Bases: RuntimeError

async aio_aws.aio_aws_batch.aio_batch_cancel_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Cancel jobs that can be cancelled (if they are not running or complete).

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_describe_jobs(job_ids: List[str], config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)Optional[Dict][source]

Asynchronous coroutine to issue a batch job description request that retrieves status information for up to 100 jobId

Parameters
  • job_ids – a list of batch jobId

  • config – settings for task pauses between retries

Returns

a describe_jobs response

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_get_logs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]

Get job logs. The logs should be updated in any configured jobs-db.

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_job_cancel(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)aio_aws.aws_batch_models.AWSBatchJob[source]

Asynchronous coroutine to cancel a job i a batch job queue. Jobs that are in the SUBMITTED, PENDING, or RUNNABLE state are canceled. Jobs that have progressed to STARTING or RUNNING are not canceled, but the API operation still succeeds, even if no job is canceled. These jobs must be terminated.

Parameters
  • job – an AWSBatchJob

  • reason – a reason to cancel the job

  • config – settings for task pauses between retries

Returns

an updated AWSBatchJob

Raises

botocore.exceptions.ClientError

Raises

RetryError if it exceeds retries

async aio_aws.aio_aws_batch.aio_batch_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)Optional[List[Dict]][source]

Asynchronous coroutine to get logs for a batch job log stream. All the events available for a log stream are collected and returned. The AWS Cloud Watch log streams can have delays, don’t expect near real-time values in these data.

Parameters
  • job – A set of job parameters

  • config – settings for task pauses between retries

Returns

a list of all the log events from get_log_events responses

Raises

botocore.exceptions.ClientError

Raises

RetryError if it exceeds retries

async aio_aws.aio_aws_batch.aio_batch_job_manager(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)aio_aws.aws_batch_models.AWSBatchJob[source]

Asynchronous coroutine to manage a batch job. The job-manager will update attributes on the job as the job passes through submission and completion. On completion, the job logs are retrieved. As the job status and logs are collected, the job data is persisted to the jobs_db.

Note that any job with a job.job_id will not re-run, those jobs will be monitored until complete. To re-run a job that has already run, first call the job.reset() method to clear any previous job state. (Any previous attempts are recorded in job.job_tries and in the job.job_description.)

Parameters
  • job – a batch job spec

  • config – settings for task pauses between retries

Returns

a describe_jobs response for job_id when it’s complete

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_job_status(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)aio_aws.aws_batch_models.AWSBatchJob[source]

Asynchronous coroutine to update a single job description. The job data is updated (it’s saved to an optional jobs-db).

Use aio_batch_update_jobs() for more efficient updates to many jobs.

Parameters
  • job – a batch job

  • config – settings for task pauses between retries

Returns

an updated AWSBatchJob

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_job_submit(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)aio_aws.aws_batch_models.AWSBatchJob[source]

Submit a batch job; for a successful submission, the jobId and submission details are updated on the job object (and an optional jobs-db).

Parameters
  • job – A set of job parameters

  • config – settings for task pauses between retries

Returns

an updated AWSBatchJob (modified in-place)

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_job_terminate(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)aio_aws.aws_batch_models.AWSBatchJob[source]

Asynchronous coroutine to terminate a batch job

Parameters
  • job – an AWSBatchJob

  • reason – a reason to terminate the job

  • config – settings for task pauses between retries

Returns

an AWSBatchJob after it is terminated

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_job_waiter(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)Optional[aio_aws.aws_batch_models.AWSBatchJob][source]

Asynchronous coroutine to wait on a batch job. There is no explict timeout on a job waiter, it depends on setting a timeout on the batch job definition. The job waiter exits when the batch job status is either “SUCCEEDED” or “FAILED”. The job data is updated each time the job description is polled (it’s also saved to the jobs-db).

job status identifiers are assumed to be: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]

Parameters
  • job – a batch job

  • config – settings for task pauses between retries

Returns

a describe_jobs response for job.job_id when it’s complete

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_batch.aio_batch_monitor_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]

Monitor submitted jobs until they complete.

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_run_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]

Submit jobs that have not been submitted yet, and monitor all jobs until they complete.

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_submit_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]

Submit jobs that have not been submitted yet.

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_terminate_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Terminate jobs that can be killed (if they are not complete).

Parameters
  • jobs – any AWSBatchJob

  • config – an AWSBatchConfig

Returns

each job maintains state, so this function returns nothing

async aio_aws.aio_aws_batch.aio_batch_update_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]

Asynchronous coroutine to update status on batch jobs.

job status identifiers in the Batch service are: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]

Parameters
  • jobs – a list of batch jobs

  • config – settings for task pauses between retries

Returns

jobs are updated in-place, with no return

Raises

botocore.exceptions.ClientError

aio_aws.aio_aws_batch.aio_find_complete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any complete jobs

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.aio_find_jobs_by_status(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any jobs that match job states.

This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.

Parameters
  • jobs – any AWSBatchJob

  • job_states – a list of valid job states

  • jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.aio_find_running_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any running jobs

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.batch_cancel_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Cancel jobs that can be cancelled (if they are not running or complete).

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_get_logs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Get job logs.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_monitor_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Monitor submitted jobs until they complete.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_run_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Submit jobs that have not been submitted yet, and monitor all jobs until they complete.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_submit_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Submit jobs that have not been submitted yet.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_terminate_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Terminate jobs that can be killed (if they are not complete).

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.batch_update_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]

Update job descriptions.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided

  • aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db

Returns

each job maintains state, so this function returns nothing

aio_aws.aio_aws_batch.find_complete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any complete jobs

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.find_incomplete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, reset_failed: bool = False)[source]

This can find any jobs that are not complete. It can also reset FAILED jobs that could be run again.

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

  • reset_failed – if enabled, any FAILED jobs will be reset so they can be run again

Returns

yield jobs found

aio_aws.aio_aws_batch.find_jobs_by_status(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any jobs that match job states.

This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.

Parameters
  • jobs – any AWSBatchJob

  • job_states – a list of valid job states

  • jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.find_running_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]

Find any running jobs

Parameters
  • jobs – any AWSBatchJob

  • jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)

Returns

yield jobs found

aio_aws.aio_aws_batch.job_for_status(job, job_states)Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
aio_aws.aio_aws_batch.parse_job_description(job_id: str, jobs: Dict)Optional[Dict][source]

Extract a job description for job_id from jobs :param job_id: an AWS Batch jobId :param jobs: a response to AWS Batch job descriptions :return: a job description for job_id

AioAWS Batch-DB

class aio_aws.aio_aws_batch_db.AioAWSBatchDB[source]

Bases: abc.ABC

Abstract Base Class for AWS Batch job databases

abstract async all_job_ids()Set[str][source]

Collect all jobIds.

abstract async all_jobs()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Collect all jobs.

Warning: this could exceed memory, try to use the gen_all_jobs() wherever possible.

abstract async count_by_job_status()collections.Counter[source]

Count all jobs by jobStatus

Returns

a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)

abstract async find_by_job_id(job_id: str)Optional[Dict][source]

Find one job by the jobId

Parameters

job_id – a batch jobId

Returns

the job data or None

abstract async find_by_job_name(job_name: str)List[Dict][source]

Find any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a list of dictionaries containing job data

abstract async find_by_job_status(job_states: List[str])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find any jobs matching any job status values

Parameters

job_states – a list of valid job states

Returns

a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)

abstract async find_job_logs(job_id: str)Optional[Dict][source]

Find job logs by job_id

Parameters

job_id – str

Returns

job logs document

abstract async find_jobs_to_run()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find all jobs that have not SUCCEEDED.

abstract async find_latest_job_name(job_name: str)Optional[aio_aws.aws_batch_models.AWSBatchJob][source]

Find the latest job matching the jobName, based on the “createdAt” time stamp.

Parameters

job_name – a batch jobName

Returns

the latest job record available

abstract gen_all_jobs()AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]

Generate all jobs.

abstract gen_job_ids()AsyncIterator[str][source]

Generate all jobIds.

abstract async group_by_job_status()Dict[str, List[Tuple[str, str, str]]][source]

Group all jobId by jobStatus

Returns

a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.

{ status: [ (job.job_id, job.job_name, job.status), ] }

abstract async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records to recover job data.

abstract async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.

Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.

Note

The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.

To recover jobs from the jobs_db, use - AWSBatchDB.find_jobs_to_run() - AWSBatchDB.jobs_db.all()

abstract async remove_by_job_id(job_id: str)Optional[Dict][source]

Remove any job matching the jobId

Parameters

job_id – a batch jobId

Returns

a deleted document

abstract async remove_by_job_name(job_name: str)Set[str][source]

Remove any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a set of deleted job-ids

abstract async save_job(job: aio_aws.aws_batch_models.AWSBatchJob)Optional[str][source]

Insert or update a job (if it has a job_id)

Parameters

job – an AWSBatchJob

Returns

the jobId if the job is saved

abstract async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob)List[int][source]

Insert or update job logs (if the job has a job_id)

Parameters

job – an AWSBatchJob

Returns

a List[tinydb.database.Document.doc_id]

class aio_aws.aio_aws_batch_db.AioAWSBatchRedisDB(redis_url: str = 'redis://127.0.0.1:6379')[source]

Bases: aio_aws.aio_aws_batch_db.AioAWSBatchDB

AWS Batch job databases - aioredis implementation

The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.

async all_job_ids()Set[str][source]

Collect all jobIds.

async all_jobs()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Collect all jobs.

Warning: this could exceed memory, try to use the gen_all_jobs() wherever possible.

async count_by_job_status()collections.Counter[source]

Count all jobs by jobStatus

Returns

a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)

property db_alive
property db_info
async db_save()[source]
property db_semaphore

A semaphore to limit requests to the db

async find_by_job_id(job_id: str)Optional[Dict][source]

Find one job by the jobId

Parameters

job_id – a batch jobId

Returns

the job data or None

async find_by_job_name(job_name: str)List[Dict][source]

Find any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a list of dictionaries containing job data

async find_by_job_status(job_states: List[str])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find any jobs matching any jobStatus values

Parameters

job_states – a list of valid job status values

Returns

a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)

async find_job_logs(job_id: str)Optional[Dict][source]

Find job logs by job_id

Parameters

job_id – str

Returns

job logs document

async find_jobs_to_run()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find all jobs that have not SUCCEEDED.

async find_latest_job_name(job_name: str)Optional[aio_aws.aws_batch_models.AWSBatchJob][source]

Find the latest job matching the jobName, based on the “createdAt” time stamp.

Parameters

job_name – a batch jobName

Returns

the latest job record available

gen_all_jobs()AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]

Generate all jobs.

gen_job_ids()AsyncIterator[str][source]

Generate all jobIds.

async group_by_job_status()Dict[str, List[Tuple[str, str, str]]][source]

Group all jobId by jobStatus

Returns

a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.

{ status: [ (job.job_id, job.job_name, job.status), ] }

property jobs_db
property jobs_db_alive
async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records to recover job data.

async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.

Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.

Note

The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.

To recover jobs from the jobs_db, use - AWSBatchDB.find_jobs_to_run() - AWSBatchDB.jobs_db.all()

property logs_db
property logs_db_alive
redis_url: str = 'redis://127.0.0.1:6379'
async remove_by_job_id(job_id: str)Optional[Dict][source]

Remove any job matching the jobId

Parameters

job_id – a batch jobId

Returns

a deleted document

async remove_by_job_name(job_name: str)Set[str][source]

Remove any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a set of deleted job-ids

async save_job(job: aio_aws.aws_batch_models.AWSBatchJob)Optional[str][source]

Insert or update a job (if it has a job_id)

Parameters

job – an AWSBatchJob

Returns

the jobId if the job is saved

async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob)List[int][source]

Insert or update job logs (if the job has a job_id)

Parameters

job – an AWSBatchJob

Returns

a List[tinydb.database.Document.doc_id]

class aio_aws.aio_aws_batch_db.AioAWSBatchTinyDB(jobs_db_file: str = '/tmp/aws_batch_jobs.json', logs_db_file: str = '/tmp/aws_batch_logs.json')[source]

Bases: aio_aws.aio_aws_batch_db.AioAWSBatchDB

AWS Batch job databases - TinyDB implementation

The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.

The batch data is a TinyDB json file, e.g.

>>> import json
>>> jobs_db_file = '/tmp/aio_batch_jobs.json'
>>> logs_db_file = '/tmp/aio_batch_logs.json'

>>> with open(jobs_db_file) as job_file:
...     batch_data = json.load(job_file)
...
>>> len(batch_data['aws-batch-jobs'])
5

>>> import tinydb
>>> tinydb.TinyDB.DEFAULT_TABLE = "aws-batch-jobs"
>>> tinydb.TinyDB.DEFAULT_TABLE_KWARGS = {"cache_size": 0}
>>> # the jobs and logs are kept in separate DBs so
>>> # that performance on the jobs_db is not compromised
>>> # by too much data from job logs.
>>> jobs_db = tinydb.TinyDB(jobs_db_file)
>>> logs_db = tinydb.TinyDB(logs_db_file)
async all_job_ids()Set[str][source]

Collect all jobIds.

async all_jobs()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Collect all jobs.

Warning: this could exceed memory, try to use the gen_all_jobs() wherever possible.

async count_by_job_status()collections.Counter[source]

Count all jobs by jobStatus

Returns

a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)

property db_semaphore

A semaphore to limit requests to the db

async find_by_job_id(job_id: str)Optional[tinydb.database.Document][source]

Find one job by the jobId

Parameters

job_id – a batch jobId

Returns

the AWSBatchJob.job_data() or None

async find_by_job_name(job_name: str)List[tinydb.database.Document][source]

Find any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a list of documents containing AWSBatchJob.job_data()

async find_by_job_status(job_states: List[str])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find any jobs matching any jobStatus values

Parameters

job_states – a list of valid job status values

Returns

a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)

async find_job_logs(job_id: str)Optional[tinydb.database.Document][source]

Find job logs by job_id

Parameters

job_id – str

Returns

a tinydb.database.Document with job logs

async find_jobs_to_run()List[aio_aws.aws_batch_models.AWSBatchJob][source]

Find all jobs that have not SUCCEEDED. Note that any jobs handled by the job-manager will not re-run if they have a job.job_id, those jobs will be monitored until complete.

async find_latest_job_name(job_name: str)Optional[aio_aws.aws_batch_models.AWSBatchJob][source]

Find the latest job matching the jobName, based on the “createdAt” time stamp.

Parameters

job_name – a batch jobName

Returns

the latest job record available

gen_all_jobs()AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]

Generate all jobs.

gen_job_ids()AsyncIterator[str][source]

Generate all jobIds.

classmethod get_paged_batch_db(db_path: pathlib.Path, offset: int, limit: int)[source]
async group_by_job_status()Dict[str, List[Tuple[str, str, str]]][source]

Group all jobId by jobStatus

Returns

a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.

{ status: [ (job.job_id, job.job_name, job.status), ] }

jobs_db_file: str = '/tmp/aws_batch_jobs.json'

a file used for :py:class::TinyDB(jobs_db_file)

async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records to recover job data.

async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob])List[aio_aws.aws_batch_models.AWSBatchJob][source]

Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.

Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.

Note

The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.

To recover jobs from the jobs_db, use - AWSBatchDB.find_jobs_to_run() - AWSBatchDB.jobs_db.all()

logs_db_file: str = '/tmp/aws_batch_logs.json'

a file used for :py:class::TinyDB(logs_db_file)

async remove_by_job_id(job_id: str)Optional[tinydb.database.Document][source]

Remove any job matching the jobId

Parameters

job_id – a batch jobId

Returns

a deleted document

async remove_by_job_name(job_name: str)Set[str][source]

Remove any jobs matching the jobName

Parameters

job_name – a batch jobName

Returns

a set of deleted job-ids

async save_job(job: aio_aws.aws_batch_models.AWSBatchJob)Optional[str][source]

Insert or update a job (if it has a job_id)

Parameters

job – an AWSBatchJob

Returns

the jobId if the job is saved

async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob)Optional[str][source]

Insert or update job logs (if the job has a job_id)

Parameters

job – an AWSBatchJob

Returns

a job.job_id if the logs are saved

AioAWS Batch Models

Batch Job metadata models.

class aio_aws.aws_batch_models.AWSBatchJob(job_name: str, job_queue: str, job_definition: str, command: Optional[List[str]] = None, depends_on: Optional[List[Dict]] = None, container_overrides: Optional[Dict] = None, job_id: Optional[str] = None, status: Optional[str] = None, job_tries: Optional[List[str]] = None, num_tries: int = 0, max_tries: int = 4, job_submission: Optional[Dict] = None, job_description: Optional[Dict] = None, logs: Optional[List[Dict]] = None)[source]

Bases: object

AWS Batch job

Creating an AWSBatchJob instance does not run anything, it’s simply a dataclass to retain and track job attributes. There are no instance methods to run a job, the instances are passed to async coroutine functions.

Replace ‘command’ with ‘container_overrides’ dict for more options; do not use ‘command’ together with ‘container_overrides’; if both are given, any valid ‘command’ value will replace the container_overrides[‘command’].

Parameters
  • job_name – A job jobName (truncated to 128 characters).

  • job_definition – A job job_definition.

  • job_queue – A batch queue.

  • command – A container command.

  • depends_on

    list of dictionaries like:

    [
      {'jobId': 'abc123', ['type': 'N_TO_N' | 'SEQUENTIAL'] },
    ]
    

    type is optional, used only for job arrays

  • container_overrides

    a dictionary of container overrides. Overrides include ‘instanceType’, ‘environment’, and ‘resourceRequirements’.

    If the command parameter is defined, it overrides container_overrides[‘command’]

    To override VCPU and MEMORY requirements in the ResourceRequirement of the job definition, a ResourceRequirement must be specified in the SubmitJob request.

    https://docs.aws.amazon.com/batch/latest/APIReference/API_ResourceRequirement.html

    VCPU (int): This parameter maps to CpuShares in the Create a container section of the Docker Remote API and the –cpu-shares option to docker run. Each vCPU is equivalent to 1,024 CPU shares. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.

    MEMORY (int): This parameter indicates the amount of memory (in MiB) that is reserved for the job. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.

    container_overrides = {
         "resourceRequirements": [
             {
                 "type": "VCPU",
                 "value": "2"
             },
             {
                 "type": "MEMORY",
                 "value": str(int(gib_to_mib(6)))
             }
         ]
     }
    

  • max_tries – an optional limit to the number of job retries, which can apply in the job-manager function to any job with a SPOT failure only and it applies regardless of the job-definition settings.

STATES = ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED']
allow_submit_job()bool[source]

Logic to determine whether a job can be submitted or resubmitted:

  • jobs with an existing job.job_id should skip submission to avoid resubmission for the same job

  • jobs with too many tries cannot be resubmitted

The AWSBatchJob.reset() can be applied in order to resubmit any job that is not allowed.

Returns

True if it is allowed

command: List[str] = None
container_overrides: Dict = None
property created

Timestamp (milliseconds since the epoch) for job createdAt; this depends on updates to the jobDescription.

The Unix timestamp (in milliseconds) for when the job was created. For non-array jobs and parent array jobs, this is when the job entered the SUBMITTED state (at the time SubmitJob was called). For array child jobs, this is when the child job was spawned by its parent and entered the PENDING state.

property created_datetime

Datetime for job createdAt; this depends on updates to the jobDescription.

property db_data

AWS Batch job data for state machine persistence

property db_logs_data

AWS Batch job with logs persistence

depends_on: List[Dict] = None
property elapsed

Timestamp (milliseconds since the epoch) for job elapsed time; this depends on updates to the jobDescription; it is the difference between createdAt and stoppedAt (this can include long periods in a job queue).

job_definition: str
job_description: Optional[Dict] = None
job_for_status(job_states: List[str])[source]
job_id: Optional[str] = None
job_name: str
job_queue: str
job_submission: Optional[Dict] = None
job_tries: List[str] = None
logs: Optional[List[Dict]] = None
max_tries: int = 4
num_tries: int = 0
property params

AWS Batch parameters for job submission

reset()[source]

Clear the job_id and all related job data

property runtime

Timestamp (milliseconds since the epoch) for job run time; this depends on updates to the jobDescription; it is the difference between startedAt and stoppedAt.

property spinup

Timestamp (milliseconds since the epoch) for job startup time; this depends on updates to the jobDescription; it is the difference between createdAt and startedAt.

property started

Timestamp (milliseconds since the epoch) for job startedAt; this depends on updates to the jobDescription.

The Unix timestamp (in milliseconds) for when the job was started (when the job transitioned from the STARTING state to the RUNNING state). This parameter isn’t provided for child jobs of array jobs or multi-node parallel jobs.

property started_datetime

Datetime for job startedAt; this depends on updates to the jobDescription.

status: Optional[str] = None
property stopped

Timestamp (milliseconds since the epoch) for job stoppedAt; this depends on updates to the jobDescription.

The Unix timestamp (in milliseconds) for when the job was stopped (when the job transitioned from the RUNNING state to a terminal state, such as SUCCEEDED or FAILED).

property stopped_datetime

Datetime for job stoppedAt; this depends on updates to the jobDescription.

property submitted

Timestamp (milliseconds since the epoch) for job submission

property submitted_datetime

Datetime for job submission; this depends on setting the job_submission.

class aio_aws.aws_batch_models.AWSBatchJobDescription(jobName: str = None, jobId: str = None, jobQueue: str = None, status: str = None, attempts: List[Dict] = None, statusReason: str = None, createdAt: int = None, startedAt: int = None, stoppedAt: int = None, dependsOn: List[str] = None, jobDefinition: str = None, parameters: Dict = None, container: Dict = None, timeout: Dict = None)[source]

Bases: object

attempts: List[Dict] = None
container: Dict = None
createdAt: int = None
dependsOn: List[str] = None
jobDefinition: str = None
jobId: str = None
jobName: str = None
jobQueue: str = None
parameters: Dict = None
startedAt: int = None
status: str = None
statusReason: str = None
stoppedAt: int = None
timeout: Dict = None
class aio_aws.aws_batch_models.AWSBatchJobStates(value)[source]

Bases: enum.Enum

An enumeration.

FAILED = 7
PENDING = 2
RUNNABLE = 3
RUNNING = 5
STARTING = 4
SUBMITTED = 1
SUCCEEDED = 6
aio_aws.aws_batch_models.gb_to_gib(gb: Union[int, float])float[source]

Gigabyte (Gb) to Gibibyte (GiB) :param gb: Gigabytes (Gb) :return: Gibibyte (GiB)

aio_aws.aws_batch_models.gb_to_mib(gb: Union[int, float])float[source]

Gigabyte (Gb) to Mebibyte (MiB) :param gb: Gigabytes (Gb) :return: Mebibytes (MiB)

aio_aws.aws_batch_models.gib_to_mib(gib: Union[int, float])float[source]

Gibibyte (GiB) to Mebibytes (GiB) :param gib: Gibibyte (GiB) :return: Mebibytes (MiB)

AioAWS Lambda

Manually create a ‘lambda_dev’ function with simple code like:

# lambda_dev/lambda_function.py

import json

def lambda_handler(event, context):
    csv = ", ".join([str(i) for i in range(10)])
    print(csv)  # to log something
    return {
        'statusCode': 200,
        'body': json.dumps(csv)
    }

The aio_aws_lambda.py module has a main script to call this simple lambda function. This lambda function easily runs within a 128 Mb memory allocation and it runs in less time than the minimal billing period, so it’s as cheap as it can be and could fit within a monthly free quota.

$ ./aio_aws/aio_aws_lambda.py
Test async lambda functions
[INFO]  2020-04-07T03:10:21.741Z  aio-aws:aio_lambda_invoke:114  AWS Lambda (lambda_dev) invoked OK
1 lambdas finished in 0.31 seconds.

$ N_LAMBDAS=1000 ./aio_aws/aio_aws_lambda.py
# snipped logging messages
1000 lambdas finished in 14.95 seconds.
class aio_aws.aio_aws_lambda.AWSLambdaFunction(name: str, type: str = 'RequestResponse', log_type: str = 'None', context: Optional[str] = None, payload: bytes = b'', qualifier: Optional[str] = None, response: Optional[Dict] = None, data: Optional[bytes] = None)[source]

Bases: object

AWS Lambda Function

Creating an AWSLambdaFunction instance does not create or invoke anything, it’s a dataclass to retain and track function parameters and response data.

Parameters
  • name – A lambda FunctionName (truncated to 64 characters).

  • type – the type of function invocation (“RequestResponse”, “Event”, “DryRun”)

  • log_type – the type of function logging (“None” or “Tail”)

  • context – the client context; for an example of a ClientContext JSON, see PutEvents in the Amazon Mobile Analytics API Reference and User Guide. The ClientContext JSON must be base64-encoded and has a maximum size of 3583 bytes.

  • payload – an input payload (bytes or seekable file-like object); JSON that you want to provide to your Lambda function as input.

  • qualifier – an optional function version or alias name; If you specify a function version, the API uses the qualified function ARN to invoke a specific Lambda function. If you specify an alias name, the API uses the alias ARN to invoke the Lambda function version to which the alias points. If you don’t provide this parameter, then the API uses unqualified function ARN which results in invocation of the $LATEST version.

LOG_TYPES = ['None', 'Tail']
TYPES = ['RequestResponse', 'Event', 'DryRun']
property content_length
property content_type
context: str = None
data: bytes = None
property error

This could be a lambda function error or a client error

async invoke(config: aio_aws.aio_aws_config.AioAWSConfig, lambda_client: aiobotocore.client.AioBaseClient)aio_aws.aio_aws_lambda.AWSLambdaFunction[source]

Asynchronous coroutine to invoke a lambda function; this updates the response and calls the py:meth:.read_response method to handle the response.

Parameters
  • config – aio session and client settings

  • lambda_client – aio client for lambda

Returns

a lambda response

Raises

botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError

property json
log_type: str = 'None'
property logs
name: str
property params

AWS Lambda parameters to invoke function

payload: bytes = b''
qualifier: str = None
async read_response()[source]

Asynchronous coroutine to read a lambda response; this updates the data attribute.

Raises

botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError

response: Dict = None
property response_headers
property response_metadata
property status_code
property text
type: str = 'RequestResponse'
async aio_aws.aio_aws_lambda.run_lambda_function_thread_pool(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction], n_tasks: int = 4)[source]
async aio_aws.aio_aws_lambda.run_lambda_functions(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction])[source]

Use some default config settings to run lambda functions

lambda_funcs = []
for i in range(5):
    event = {"i": i}
    payload = json.dumps(event).encode()
    func = AWSLambdaFunction(name="lambda_dev", payload=payload)
    lambda_funcs.append(func)
asyncio.run(run_lambda_functions(lambda_funcs))
for func in lambda_funcs:
    assert response_success(func.response)
Returns

it returns nothing, the lambda function will contain the results of any lambda response in func.response and func.data

AioAWS S3

Example code for async AWS S3 services. The aiobotocore library wraps a release of botocore to patch it with features for async coroutines using asyncio and aiohttp. The simple example code iterates on a list of buckets or objects to issue a HEAD request, as a simple way to determine whether access is permitted. The example compares an async approach (using aiobotocore) vs a sequential blocking approach (using botocore). The async approach uses a client connection limiter, based on asyncio.Semaphore(20).

$ ./aio_aws/aio_aws_s3.py

aio-aws check an s3 object that exists
GET HEAD s3://noaa-goes16/index.html -> {'ResponseMetadata': {'RequestId': 'A2F4DF282C84341B', 'HostId': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'x-amz-request-id': 'A2F4DF282C84341B', 'date': 'Tue, 03 Mar 2020 19:22:21 GMT', 'last-modified': 'Wed, 12 Feb 2020 19:14:27 GMT', 'etag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'content-encoding': 'text/html', 'accept-ranges': 'bytes', 'content-type': 'text/html', 'content-length': '32356', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'AcceptRanges': 'bytes', 'LastModified': datetime.datetime(2020, 2, 12, 19, 14, 27, tzinfo=tzutc()), 'ContentLength': 32356, 'ETag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'ContentEncoding': 'text/html', 'ContentType': 'text/html', 'Metadata': {}}

aio-aws check an s3 object that is missing; it raises.
GET HEAD s3://noaa-goes16/missing.html -> 404: object missing

aio-aws find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 2.42 seconds.

aio-aws collection of all objects in a bucket-prefix.
found 9015 s3 objects
finished in 4.43 seconds.

aws-sequential find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 31.11 seconds.

aws-sequential collection for all objects in a bucket-prefix
found 9015 s3 objects
finished in 35.72 seconds.

Checking equality of collections for aio-aws vs. aws-sync
The collections for aio-aws vs. aws-sync match OK
class aio_aws.aio_aws_s3.S3Parts(bucket: str, key: str, PROTOCOL: str = 's3://', DELIMITER: str = '/')[source]

Bases: object

DELIMITER: str = '/'
PROTOCOL: str = 's3://'
bucket: str
key: str
static parse_s3_uri(uri: str)aio_aws.aio_aws_s3.S3Parts[source]
property s3_uri
async aio_aws.aio_aws_s3.aio_s3_bucket_access(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Tuple[str, bool][source]

Asynchronous coroutine to issue a HEAD request to check if s3 bucket access is allowed.

Parameters
  • bucket_name – an s3 bucket name

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a tuple of (bucket_name: str, access: bool)

async aio_aws.aio_aws_s3.aio_s3_bucket_head(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Dict[source]

Asynchronous coroutine to issue a HEAD request for s3 bucket.

Parameters
  • bucket_name – an s3 bucket name

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a response to a HEAD request

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_s3.aio_s3_buckets_access(buckets: List[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Dict[str, bool][source]

Asynchronous coroutine to issue HEAD requests on all available s3 buckets to check if each bucket allows access.

Parameters
  • buckets – a list of bucket names to check; the default is to issue a request for all buckets in the session region

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

dict of {bucket_name: str, access: bool}

async aio_aws.aio_aws_s3.aio_s3_buckets_list(config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Dict[source]

Asynchronous coroutine to list all buckets.

Parameters
  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a response to a ListBuckets request

async aio_aws.aio_aws_s3.aio_s3_object_access(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Tuple[str, bool][source]

Asynchronous coroutine to issue a HEAD request to check if s3 object access is allowed.

Parameters
  • s3_uri – an s3 URI

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a tuple of (s3_uri: str, access: bool)

async aio_aws.aio_aws_s3.aio_s3_object_head(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Dict[source]

Asynchronous coroutine to issue a HEAD request for s3 object.

Parameters
  • s3_uri – an s3 URI

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a response to a HEAD request

Raises

botocore.exceptions.ClientError

async aio_aws.aio_aws_s3.aio_s3_objects_access(s3_uris: List[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)Dict[str, bool][source]

Asynchronous coroutine to issue HEAD requests on all available s3 objects to check if each s3_object allows access.

Parameters
  • s3_uris – a list of s3 uris in the form ‘s3://bucket-name/key’

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

dict of {s3_uri: str, access: bool}

async aio_aws.aio_aws_s3.aio_s3_objects_list(bucket_name: str, bucket_prefix: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient)List[Dict][source]

Asynchronous coroutine to collect all objects in a bucket prefix.

Parameters
  • bucket_name – param passed to s3_client.list_objects_v2 ‘Bucket’

  • bucket_prefix – param passed to s3_client.list_objects_v2 ‘Prefix’

  • config – an AioAWSConfig

  • s3_client – an AioBaseClient for s3

Returns

a list of s3 object data, e.g.

>>> aio_s3_objects[0]
{'ETag': '"192e29f360ea8297b5876b33b8419741"',
 'Key': 'ABI-L2-ADPC/2019/337/13/OR_ABI-L2-ADPC-M6_G16_s20193371331167_e20193371333539_c20193371334564.nc',
 'LastModified': datetime.datetime(2019, 12, 3, 14, 27, 5, tzinfo=tzutc()),
 'Size': 892913,
 'StorageClass': 'INTELLIGENT_TIERING'}

aio_aws.aio_aws_s3.aws_s3_bucket_access(bucket_name: str, s3_client: botocore.client.BaseClient)bool[source]

Check access to an S3 bucket by issuing a HEAD request

Parameters
  • bucket_name – An s3 bucket name

  • s3_client – botocore.client.BaseClient for s3

Returns

a tuple of (bucket_name: str, access: bool)

aio_aws.aio_aws_s3.aws_s3_buckets_access(s3_client: botocore.client.BaseClient)Dict[str, bool][source]

A concurrent.futures approach to list all s3 buckets and issue a HEAD request to check if access is allowed. This is used in performance comparisons between botocore and aiobotocore functionality.

Parameters

s3_client – botocore.client.BaseClient for s3

Returns

a tuple of (bucket_name: str, access: bool)

S3 AIO

These functions provide non-blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.

async aio_aws.s3_aio.geojson_s3_dump(geojson_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Optional[str][source]

Write GeoJSON to an s3 URI

Parameters
  • geojson_data – an object to json.dump

  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the s3 URI on success

async aio_aws.s3_aio.geojson_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Dict[source]

Read GeoJSON data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

geojson data

async aio_aws.s3_aio.geojsons_dump(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str])Optional[Union[pathlib.Path, str]][source]

Write a GeoJSON Text Sequence file

Parameters
  • geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]

  • geojsons_file – a file path to write

Returns

if the dump succeeds, return the geojsons_file, or None

async aio_aws.s3_aio.geojsons_s3_dump(geojson_features: List[Dict], s3uri: str, s3_client: aiobotocore.client.AioBaseClient)Optional[str][source]

Write GeoJSON Text Sequence files to an s3 URI

[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:

import geopandas as gpd

s3_uri = "s3://your-bucket/prefix/input.geojsons"
gdf = gpd.read_file(s3_uri)
Parameters
  • geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]

  • s3uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the s3 URI on success

async aio_aws.s3_aio.geojsons_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)List[Dict][source]

Read GeoJSON Text Sequence data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

geojson features

async aio_aws.s3_aio.get_s3_content(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)[source]

Read an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the data from the s3 object

async aio_aws.s3_aio.json_dump(data: Any, file: Union[pathlib.Path, str])Optional[Union[pathlib.Path, str]][source]

Write JSON to a file

Parameters
  • data – any data compatible with json.dumps

  • file – a file path to write

Returns

if the dump succeeds, return the file, or None

async aio_aws.s3_aio.json_s3_dump(json_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Optional[str][source]

Write JSON to an s3 URI

Parameters
  • json_data – an object to json.dump

  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the s3 URI on success

async aio_aws.s3_aio.json_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Any[source]

Read JSON data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

data from the json load

async aio_aws.s3_aio.put_s3_content(data_file: str, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Optional[str][source]

Write a file to an s3 object

Parameters
  • data_file – a data file

  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_aio.run_s3_load_files(s3_uris: List[str], *args, **kwargs)Dict[str, Optional[Any]][source]

Collect data from S3 files in JSON or YAML formats

Parameters

s3_uris – a list of S3 URIs

Returns

a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully

aio_aws.s3_aio.s3_aio_client(*args, **kwargs)aiobotocore.client.AioBaseClient[source]

This creates a aiobotocore.client.AioBaseClient that uses aiohttp for asynchronous s3 requests

Returns

a aiobotocore.client.AioBaseClient for s3

async aio_aws.s3_aio.s3_file_info(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: aiobotocore.client.AioBaseClient)aio_aws.s3_uri.S3Info[source]

Collect data from an S3 HEAD request for an S3URI

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data

async aio_aws.s3_aio.s3_files_info(s3_uris: List[Union[aio_aws.s3_uri.S3URI, str]], s3_client: aiobotocore.client.AioBaseClient)List[aio_aws.s3_uri.S3Info][source]

Collect data from S3 HEAD requests for many S3URI

Parameters
  • s3_uris – a list of S3URI

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

a list of S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data

async aio_aws.s3_aio.s3_load_file(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Tuple[str, Optional[Any]][source]

Load various file types from s3; it supports files with a known file suffix, such as “.json”, “.geojson”, “.geojsons”, “.yaml”, “.yml”

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

a Tuple[s3_uri, Optional[s3_data]] where the s3_data could be None if the file type is not recognized or a file read fails

async aio_aws.s3_aio.s3_load_files(s3_uris: List[str], *args, s3_client: Optional[aiobotocore.client.AioBaseClient] = None, **kwargs)Dict[str, Optional[Any]][source]

Collect data from S3 files in JSON or YAML formats

Parameters
  • s3_uris – a list of S3 URIs

  • s3_client – an optional aiobotocore.client.AioBaseClient for s3

Returns

a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully

async aio_aws.s3_aio.yaml_dump(data: Any, file: Union[pathlib.Path, str])Optional[Union[pathlib.Path, str]][source]

Write YAML to a file

Parameters
  • data – any data compatible with yaml.safe_dump

  • file – a file path to write

Returns

if the dump succeeds, return the file, or None

async aio_aws.s3_aio.yaml_s3_dump(yaml_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Optional[str][source]

Write YAML to an s3 URI

Parameters
  • yaml_data – an object to yaml.dump

  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

the s3 URI on success

async aio_aws.s3_aio.yaml_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)Any[source]

Read YAML data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for an s3 object

  • s3_client – a required aiobotocore.client.AioBaseClient for s3

Returns

data from the yaml load

S3 IO

These functions provide blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.

aio_aws.s3_io.geojson_s3_dump(geojson_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Optional[str][source]

Write GeoJSON to an s3 URI

Parameters
  • geojson_data – an object to json.dump

  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_io.geojson_s3_load(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Dict[source]

Read GeoJSON data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

geojson data

aio_aws.s3_io.geojsons_dump(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str])Optional[Union[pathlib.Path, str]][source]
Parameters
  • geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]

  • geojsons_file – a file path to write

Returns

if the dump succeeds, return the geojsons_file, or None

aio_aws.s3_io.geojsons_s3_dump(geojson_features: List[Dict], s3uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Optional[str][source]

Write GeoJSON Text Sequence files to an s3 URI

[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:

import geopandas as gpd

s3_uri = "s3://your-bucket/prefix/input.geojsons"
gdf = gpd.read_file(s3_uri)
Parameters
  • geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]

  • s3uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_io.geojsons_s3_load(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)List[Dict][source]

Read GeoJSON Text Sequence data from an s3 object

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

geojson features

aio_aws.s3_io.get_s3_content(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)[source]

Read an s3 URI

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the data from the s3 object

aio_aws.s3_io.json_s3_dump(json_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Optional[str][source]

Write JSON to an s3 URI

Parameters
  • json_data – an object to json.dump

  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_io.json_s3_load(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Any[source]

Load JSON from an s3 URI

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

data from the json load

aio_aws.s3_io.put_s3_content(data_file: str, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Optional[str][source]

Write a file to an s3 URI

Parameters
  • data_file – a data file

  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_io.s3_file_info(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: Optional[botocore.client.BaseClient] = None)aio_aws.s3_uri.S3Info[source]

Collect data from an S3 HEAD request for an S3URI

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to read

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data

aio_aws.s3_io.s3_files_info(s3_uris: List[Union[aio_aws.s3_uri.S3URI, str]], s3_client: Optional[botocore.client.BaseClient] = None)List[aio_aws.s3_uri.S3Info][source]
aio_aws.s3_io.s3_io_client()botocore.client.BaseClient[source]

This creates a botocore.client.BaseClient that uses urllib3, which should be thread-safe, with a default connection pool.

This uses a botocore.config.Config with 3 retries using a standard back off method

Returns

a botocore.client.BaseClient for s3

aio_aws.s3_io.yaml_s3_dump(yaml_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Optional[str][source]

Write YAML to an s3 URI

Parameters
  • yaml_data – an object to yaml.dump

  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

the s3 URI on success

aio_aws.s3_io.yaml_s3_load(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)Any[source]

Load YAML from an s3 URI

Parameters
  • s3_uri – a fully qualified S3 URI for the s3 object to write

  • s3_client – an optional botocore.client.BaseClient for s3

Returns

data from the yaml load

S3URI

class aio_aws.s3_uri.S3Info(s3_uri: aio_aws.s3_uri.S3URI, s3_size: int = None, last_modified: datetime.datetime = None)[source]

Bases: object

property dict
property iso8601
property json
last_modified: datetime.datetime = None
s3_size: int = None
s3_uri: aio_aws.s3_uri.S3URI
class aio_aws.s3_uri.S3Object(bucket: str, key: str)[source]

Bases: object

Just the bucket_name and key for an s3.ObjectSummary.

This simple dataclass should work around problems with Pickle for an s3.ObjectSummary.

from aio_aws.s3_uri import S3Object

# assume obj is an s3.ObjectSummary
S3Object(bucket=obj.bucket_name, key=obj.key)
bucket: str
key: str
property s3_uri

s3_uri: str for s3://{bucket}/{key}

class aio_aws.s3_uri.S3Paths(s3_uri: str = '', bucket: str = '', key: str = '')[source]

Bases: object

S3 URI components

as_uri(protocol: str = 's3://')str[source]

a URI for {protocol}{bucket}/{key}

property bucket
glob_file_pattern()[source]

Construct a glob pattern relative to the key_path. The final glob pattern is: {key_path}/**/{file_stem}*.*

Returns

str

glob_pattern(glob_pattern: str = '**/*')[source]

Construct a glob pattern relative to the key_path. The final glob pattern is: str(PurePosixPath(self.key_path) / glob_pattern)

Parameters

glob_pattern – str defaults to '**/*'

Returns

str

static is_valid_bucket(bucket: str)bool[source]
property key
property key_file
property key_path
static parse_s3_uri(s3_uri: str)aio_aws.s3_uri.S3Paths[source]

Parse an S3 URI into components; the delimiter must be ‘/’

Parameters

s3_uri

Returns

S3Paths(bucket, key, key_path, key_file)

property protocol
property s3_uri

s3_uri: str for s3://{bucket}/{key}

class aio_aws.s3_uri.S3URI(s3_uri: str = '', bucket: str = '', key: str = '')[source]

Bases: aio_aws.s3_uri.S3Paths

S3 URI representation and parsing.

At present, this class is designed to represent one s3 object. Although it has some utilities for working with general bucket and key paths, it is designed to work with one s3 object.

For technical details on S3 Objects, refer to

The following are the rules for naming S3 buckets in all AWS Regions:

  • Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.

  • Bucket names must be unique across all existing bucket names in Amazon S3.

  • Bucket names must comply with DNS naming conventions.

  • Bucket names must be at least 3 and no more than 63 characters long.

  • Bucket names must not contain uppercase characters or underscores.

  • Bucket names must start with a lowercase letter or number.

  • Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.

  • Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).

  • When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.

The following are the rules for naming S3 keys:

  • The name for a key is a sequence of Unicode characters whose UTF-8 encoding is at most 1024 bytes long.

Parameters
  • bucket – str for bucket in s3://{bucket}/{key}

  • key – str for key in s3://{bucket}/{key}

static parse_s3_uri(s3_uri: str)aio_aws.s3_uri.S3URI[source]

Parse an S3 URI into components; the delimiter must be ‘/’

Parameters

s3_uri

Returns

S3Paths(bucket, key, key_path, key_file)

s3_bucket()s3.Bucket[source]

A boto3.resource('s3').Bucket('bucket')

s3_derivatives()Iterable[source]

Use s3_objects(glob_file_pattern) to find all the derivatives with matching file names (key_file*.*) below the path of this file.

Returns

Iterable[s3.ObjectSummary]

s3_exists()bool[source]

Check the s3 file summary to confirm a file exists. :return: bool True if the s3 file exists

s3_head_request()dict[source]

Issue an HTTP HEAD request to get the s3 URI summary data.

Returns

dict of HEAD response data or an empty dict if it fails.

s3_last_modified()Optional[datetime.datetime][source]
s3_object_summary()s3.ObjectSummary[source]

Create an s3.ObjectSummary for this S3URI; this does not check if the object exists.

Returns

s3.ObjectSummary

s3_objects(glob_pattern: Optional[str] = None, prefix: Optional[str] = None)Iterable[source]

Search all the s3 objects at or below the s3_uri.bucket, using an optional filter prefix and/or a glob pattern (defaults to everything).

Using the s3_uri.key_path as a filter prefix is most often critical to limit the number of bucket objects to process. The addition of a glob pattern is applied to the results of all objects found; the glob pattern match is applied to each s3.ObjectSummary.key.

Parameters
  • prefix – str to specify a Prefix filter based on glob patterns

  • glob_pattern – str to specify a filter based on glob patterns

Returns

Iterable[s3.ObjectSummary]

aio_aws.s3_uri.bucket_validate(bucket_name: str)bool[source]

For technical details on S3 buckets, refer to

The following are the rules for naming S3 buckets in all AWS Regions:

  • Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.

  • Bucket names must be unique across all existing bucket names in Amazon S3.

  • Bucket names must comply with DNS naming conventions.

  • Bucket names must be at least 3 and no more than 63 characters long.

  • Bucket names must not contain uppercase characters or underscores.

  • Bucket names must start with a lowercase letter or number.

  • Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.

  • Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).

  • When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.

Parameters

bucket_name

Returns

True if the bucket_name is valid

Raises

ValueError if the bucket_name is not valid

aio_aws.utils.datetime_from_unix_milliseconds(msec: float)datetime.datetime[source]
aio_aws.utils.datetime_to_http_date(dt: datetime.datetime)str[source]

Parse a datetime into an HTTP date string (using GMT).

Parameters

dt – a datetime

Returns

an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”

aio_aws.utils.datetime_to_unix_milliseconds(dt: datetime.datetime)int[source]

datetime to unix timestamp - time since the epoch in milliseconds

aio_aws.utils.handle_head_error_code(error: botocore.exceptions.ClientError, item: Optional[str] = None)Optional[bool][source]
aio_aws.utils.http_date_to_datetime(http_date: str)datetime.datetime[source]

Parse a HTTP date string into a datetime.

Parameters

http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”

Returns

a datetime

aio_aws.utils.http_date_to_timestamp(http_date: str)float[source]

Parse a HTTP date string into a timestamp as seconds since the epoch.

Parameters

http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”

Returns

a timestamp (seconds since the epoch)

aio_aws.utils.response_code(response)int[source]
aio_aws.utils.response_success(response)bool[source]
aio_aws.utils.timestamp_to_http_date(ts: float)str[source]

Parse a timestamp (seconds since the epoch) into an HTTP date string (using GMT).

Parameters

ts – a timestamp (seconds since the epoch)

Returns

an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”

aio_aws.utils.utc_now()datetime.datetime[source]

UTC datetime - tz aware

aio_aws.utils.utc_timestamp()float[source]

Unix timestamp - time since the epoch in seconds

aio_aws.utils.utc_unix_milliseconds()int[source]

Unix timestamp - time since the epoch in milliseconds

aio_aws.uuid_utils.generate_hex_uuids(n_uuids: int)Iterator[str][source]

Generate an iterator of UUIDs

aio_aws.uuid_utils.generate_uuids(n_uuids: int)Iterator[str][source]

Generate an iterator of UUIDs

aio_aws.uuid_utils.get_hex_uuid(_: Optional[int] = None)str[source]

Get a UUID as a hex string value

aio_aws.uuid_utils.get_hex_uuids(n_uuids: int)List[str][source]

Get a list of UUIDs

aio_aws.uuid_utils.get_uuid(_: Optional[int] = None)str[source]

Get a UUID as a string value

aio_aws.uuid_utils.get_uuids(n_uuids: int)List[str][source]

Get a list of UUIDs

aio_aws.uuid_utils.valid_hex_uuid4(uuid)[source]
aio_aws.uuid_utils.valid_uuid4(uuid)[source]