AioAws
AioAWS Settings
Using asyncio for AWS services requires the aiobotocore library, which wraps a
release of botocore to patch it with features for async coroutines using
asyncio and aiohttp. To avoid issuing too many concurrent requests (DOS attack),
the async approach should use a client connection limiter, based on asyncio.Semaphore().
It’s recommended to use a single session and a single client with a connection pool.
Although there are context manager patterns, it’s also possible to manage closing the client
after everything is done.
See also
- class aio_aws.aio_aws_config.AioAWSConfig(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 10, sem: int = 10, session: aiobotocore.session.AioSession = None)[source]
Bases:
object- aws_region: str = None
an optional AWS region name
- create_client(service: str, *args, **kwargs) aiobotocore.client.AioBaseClient[source]
Create and yield a new client using the
AioAWSConfig.session; the clients are configured with a default limit on the size of the connection pool defined byAioAWSConfig.max_pool_connectionsconfig = AioAWSConfig() async with config.create_client("s3") as client: response = await s3_client.head_bucket(Bucket=bucket_name)
It is possible to pass through additional args and kwargs including config=botocore.client.Config.
- Yield
an aiobotocore.client.AioBaseClient for AWS service
- property default_client_config: botocore.config.Config
- max_jitter: float = 10
an asyncio.sleep for
random.uniform(min_jitter, max_jitter)
- max_pause: float = 30
an asyncio.sleep for
random.uniform(min_pause, max_pause)
- max_pool_connections: int = 10
defines a limit to the number of client connections
- min_jitter: float = 1
an asyncio.sleep for
random.uniform(min_jitter, max_jitter)
- min_pause: float = 5
an asyncio.sleep for
random.uniform(min_pause, max_pause)
- retries: int = 4
a number of retries for an AWS client request/response
- sem: int = 10
- property semaphore: asyncio.locks.Semaphore
An asyncio.Semaphore to limit the number of concurrent clients; this defaults to the session client connection pool
- session: aiobotocore.session.AioSession = None
an aiobotocore.session.AioSession
- aio_aws.aio_aws_config.MAX_JITTER: float = 10
Maximum API request jitter
- aio_aws.aio_aws_config.MAX_PAUSE: float = 30
Maximum task pause
- aio_aws.aio_aws_config.MAX_POOL_CONNECTIONS = 10
max_pool_connections for AWS clients (10 by default)
- aio_aws.aio_aws_config.MIN_JITTER: float = 1
Minimum API request jitter
- aio_aws.aio_aws_config.MIN_PAUSE: float = 5
Minimum task pause
- aio_aws.aio_aws_config.RETRY_EXCEPTIONS = ['TooManyRequestsException', 'ThrottlingException']
Common exception codes that are retried
- aio_aws.aio_aws_config.aio_aws_client(service_name: str, *args, **kwargs) aiobotocore.client.AioBaseClient[source]
Yield an asyncio AWS client with an option to provide a client-specific config; this is a thin wrapper on
aiobotocore.session.get_session().create_client()and the additional kwargs as passed through tosession.create_client(**kwargs).It is possible to pass through additional args and kwargs including config: aiobotocore.config.AioConfig (the default is
aio_aws_default_config())s3_endpoint = "http://localhost:5555" client_config = botocore.client.Config( read_timeout=120, max_pool_connections=50, ) async with aio_aws_client( "s3", endpoint_url=s3_endpoint, config=client_config ) as client: assert "aiobotocore.client.S3" in str(client) assert isinstance(client, aiobotocore.client.AioBaseClient)
- Parameters
service_name – an AWS service for a client, like “s3”, try
session.get_available_services()- Yield
aiobotocore.client.AioBaseClient
- aio_aws.aio_aws_config.aio_aws_default_config() aiobotocore.config.AioConfig[source]
Get a default asyncio AWS config using a default py:const:MAX_POOL_CONNECTIONS
- Returns
aiobotocore.config.AioConfig
- aio_aws.aio_aws_config.aio_aws_default_session() aiobotocore.session.AioSession[source]
Get a default asyncio AWS session with a default config from
aio_aws_default_config()- Returns
aiobotocore.session.AioSession
- aio_aws.aio_aws_config.aio_aws_session(aio_aws_config: Optional[aiobotocore.config.AioConfig] = None) aiobotocore.session.AioSession[source]
Get an asyncio AWS session with an ‘aio-aws’ user agent name
- Parameters
aio_aws_config – an aiobotocore.config.AioConfig (default
aio_aws_default_config())- Returns
aiobotocore.session.AioSession
- aio_aws.aio_aws_config.asyncio_default_semaphore() asyncio.locks.Semaphore[source]
a default semaphore to limit creation of clients; it defaults to 2 * py:const:MAX_POOL_CONNECTIONS
- Returns
aiobotocore.config.AioConfig
- async aio_aws.aio_aws_config.delay(task_id: str, min_pause: float = 5, max_pause: float = 30) float[source]
- async aio_aws.aio_aws_config.jitter(task_id: str = 'jitter', min_jitter: float = 1, max_jitter: float = 10) float[source]
Await a random pause between min_jitter and max_jitter
- Parameters
task_id – an optional ID for the task awaiting this jitter
min_jitter – defaults to
MIN_JITTERmax_jitter – defaults to
MAX_JITTER
- Returns
random interval for pause
AioAWS Batch
In testing this, it’s able to run and monitor 100s of jobs from a laptop with modest CPU/RAM resources. It can recover from a crash by using a db-state, without re-running the same jobs (by jobName). It should be able to scale to run 1000s of jobs.
To run the example, the aio_aws.aio_aws_batch module has a main that will run and
manage about 5 live batch jobs (very small sleep jobs that don’t cost much to run). The
job state is persisted to aws_batch_jobs.json and if it runs successfully, it will not
run the jobs again; the TinyDB is used to recover job state by jobName. (This demo
assumes that some simple AWS Batch infrastructure exists already.)
# setup the python virtualenv
# check the main details and modify for a preferred batch queue/CE and AWS region
$ ./aio_aws/aio_aws_batch.py
Test async batch jobs
# wait a few minutes and watch the status messages
# it submits and monitors the jobs until they complete
# job status is saved and updated in `aws_batch_jobs.json`
# when it's done, run it again and see that nothing is re-submitted
$ ./aio_aws/aio_aws_batch.py
If the job monitoring is halted for some reason (like CNT-C), it can recover from
the db-state, e.g.
$ ./aio_aws/aio_aws_batch.py
Test async batch jobs
[INFO] 2020-03-05T14:51:53.372Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0000) recovered from db
[INFO] 2020-03-05T14:51:53.373Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0001) recovered from db
[INFO] 2020-03-05T14:51:53.373Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0002) recovered from db
[INFO] 2020-03-05T14:51:53.374Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0003) recovered from db
[INFO] 2020-03-05T14:51:53.374Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0004) recovered from db
[INFO] 2020-03-05T14:51:53.690Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (846d54d4-c3c3-4a3b-9101-646d78d3bbfb) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.692Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (dfce3461-9eab-4f5b-846c-6f223d593f6f) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.693Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (637e6b27-8d4d-4f45-b988-c00775461616) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.701Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (d9ac27c9-e7d3-49cd-8f53-c84a9b4c1750) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.732Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (7ebfe7c4-44a4-40d6-9eab-3708e334689d) status: RUNNABLE
For the demo to run quickly, most of the module settings are fit for fast jobs. For much longer running jobs, there are functions that only submit jobs or check jobs and the settings should be changed for monitoring jobs to only check every 10 or 20 minutes.
Quick Start
from aio_aws.aws_batch_models import AWSBatchJob
from aio_aws.aio_aws_batch import batch_monitor_jobs
from aio_aws.aio_aws_batch import batch_submit_jobs
job1 = AWSBatchJob(
job_name="sleep-1-job",
job_definition=your_job_definition_arn,
job_queue=your_job_queue_arn,
command=["/bin/sh", "-c", "echo Hello && sleep 1 && echo Bye"],
)
job2 = AWSBatchJob(
job_name="sleep-2-job",
job_definition=your_job_definition_arn,
job_queue=your_job_queue_arn,
command=["/bin/sh", "-c", "echo Hello && sleep 2 && echo Bye"],
)
batch_submit_jobs(jobs=jobs)
batch_monitor_jobs(jobs=jobs)
This example uses high level synchronous wrappers to make it easy get started. There are a lot of details hidden in these convenient wrappers, but all the code is easily read in aio_aws.aio_aws_batch. For advanced use, please read the source code, especially the unit tests in the code repo. There is very little time to maintain the documentation.
Monitoring Jobs
The aio_aws.aio_aws_batch.aio_batch_job_manager() can submit a job, wait
for it to complete and retry if it fails on a SPOT termination. It saves the job status
using the aio_aws.aio_aws_batch import AWSBatchDB. The job manager
uses aio_aws.aio_aws_batch.aio_batch_job_waiter(), which uses these settings
to control the async-wait between polling the job status:
These settings control how often job descriptions are polled. These requests for job status are also limited by the client connection pool and the client semaphore used by the job manager. Since AWS Batch has API limits on the number of requests for job status, it’s best to use a client connection pool and semaphore of about 10 connections. Any failures to poll for a job status will be retried a few times (using some random jitter on retry rates).
To modify the polling frequency settings, use a custom config. For example, the unit test suite uses much faster polling on mock batch jobs to speed up the unit tests; e.g.
config = AWSBatchConfig(
start_pause=0.4, min_pause=0.8, max_pause=1.0, min_jitter=0.1, max_jitter=0.2,
)
See also
- class aio_aws.aio_aws_batch.AWSBatchConfig(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 2, sem: int = 100, session: aiobotocore.session.AioSession = None, start_pause: float = 30, aio_batch_db: Union[aio_aws.aio_aws_batch_db.AioAWSBatchDB, NoneType] = None)[source]
Bases:
aio_aws.aio_aws_config.AioAWSConfig- aio_batch_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None
an optional AioAWSBatchDB
- create_batch_client(*args, **kwargs) aiobotocore.client.AioBaseClient[source]
Create and yield an AWS Batch client using the
AWSBatchConfig.session- Yield
an aiobotocore.client.AioBaseClient for AWS Batch
- create_logs_client(*args, **kwargs) aiobotocore.client.AioBaseClient[source]
Create and yield an AWS CloudWatchLogs client using the
AWSBatchConfig.session- Yield
an aiobotocore.client.AioBaseClient for AWS CloudWatchLogs
- property default_client_config: botocore.config.Config
- max_pool_connections: int = 2
defines a limit to the number of client connections
- sem: int = 100
- start_pause: float = 30
a batch job startup pause,
random.uniform(start_pause, start_pause * 2); this applies when the job status is in [“SUBMITTED”, “PENDING”, “RUNNABLE”]
- aio_aws.aio_aws_batch.BATCH_STARTUP_PAUSE: float = 30
batch job startup pause (seconds)
- async aio_aws.aio_aws_batch.aio_batch_cancel_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Cancel jobs that can be cancelled (if they are not running or complete).
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_describe_jobs(job_ids: List[str], config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) Optional[Dict][source]
Asynchronous coroutine to issue a batch job description request that retrieves status information for up to 100 jobId
- Parameters
job_ids – a list of batch jobId
config – settings for task pauses between retries
- Returns
a describe_jobs response
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_get_logs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig, skip_existing: bool = False)[source]
Get job logs. The logs should be updated in any configured jobs-db.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
skip_existing – skip jobs that already have logs; this is useful if some jobs already have complete log events captured
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_job_cancel(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) aio_aws.aws_batch_models.AWSBatchJob[source]
Asynchronous coroutine to cancel a job i a batch job queue. Jobs that are in the SUBMITTED, PENDING, or RUNNABLE state are canceled. Jobs that have progressed to STARTING or RUNNING are not canceled, but the API operation still succeeds, even if no job is canceled. These jobs must be terminated.
- Parameters
job – an AWSBatchJob
reason – a reason to cancel the job
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob
- Raises
botocore.exceptions.ClientError
- Raises
RetryError if it exceeds retries
- async aio_aws.aio_aws_batch.aio_batch_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None, skip_existing: bool = False) Optional[List[Dict]][source]
Asynchronous coroutine to get logs for a batch job log stream. All the events available for a log stream are collected and returned. The AWS Cloud Watch log streams can have delays, don’t expect near real-time values in these data.
- Parameters
job – A set of job parameters
config – settings for task pauses between retries
skip_existing – skip jobs that already have logs; this is useful if some jobs already have complete log events captured
- Returns
a list of all the log events from get_log_events responses
- Raises
botocore.exceptions.ClientError
- Raises
RetryError if it exceeds retries
- async aio_aws.aio_aws_batch.aio_batch_job_manager(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) aio_aws.aws_batch_models.AWSBatchJob[source]
Asynchronous coroutine to manage a batch job. The job-manager will update attributes on the job as the job passes through submission and completion. On completion, the job logs are retrieved. As the job status and logs are collected, the job data is persisted to the jobs_db.
Note that any job with a job.job_id will not re-run, those jobs will be monitored until complete. To re-run a job that has already run, first call the job.reset() method to clear any previous job state. (Any previous attempts are recorded in job.job_tries and in the job.job_description.)
- Parameters
job – a batch job spec
config – settings for task pauses between retries
- Returns
a describe_jobs response for job_id when it’s complete
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_job_status(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) aio_aws.aws_batch_models.AWSBatchJob[source]
Asynchronous coroutine to update a single job description. The job data is updated (it’s saved to an optional jobs-db).
Use
aio_batch_update_jobs()for more efficient updates to many jobs.- Parameters
job – a batch job
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_job_submit(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) aio_aws.aws_batch_models.AWSBatchJob[source]
Submit a batch job; for a successful submission, the jobId and submission details are updated on the job object (and an optional jobs-db).
- Parameters
job – A set of job parameters
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob (modified in-place)
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_job_terminate(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) aio_aws.aws_batch_models.AWSBatchJob[source]
Asynchronous coroutine to terminate a batch job
- Parameters
job – an AWSBatchJob
reason – a reason to terminate the job
config – settings for task pauses between retries
- Returns
an AWSBatchJob after it is terminated
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_job_waiter(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
Asynchronous coroutine to wait on a batch job. There is no explict timeout on a job waiter, it depends on setting a timeout on the batch job definition. The job waiter exits when the batch job status is either “SUCCEEDED” or “FAILED”. The job data is updated each time the job description is polled (it’s also saved to the jobs-db).
job status identifiers are assumed to be: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]
- Parameters
job – a batch job
config – settings for task pauses between retries
- Returns
a describe_jobs response for job.job_id when it’s complete
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_batch_monitor_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]
Monitor submitted jobs until they complete.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_run_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]
Submit jobs that have not been submitted yet, and monitor all jobs until they complete.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_submit_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]
Submit jobs that have not been submitted yet.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_terminate_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Terminate jobs that can be killed (if they are not complete).
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
- async aio_aws.aio_aws_batch.aio_batch_update_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]
Asynchronous coroutine to update status on batch jobs.
job status identifiers in the Batch service are: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]
- Parameters
jobs – a list of batch jobs
config – settings for task pauses between retries
- Returns
jobs are updated in-place, with no return
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_batch.aio_find_complete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any complete jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- async aio_aws.aio_aws_batch.aio_find_jobs_by_status(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any jobs that match job states.
This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.
- Parameters
jobs – any AWSBatchJob
job_states – a list of valid job states
jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- async aio_aws.aio_aws_batch.aio_find_running_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any running jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- aio_aws.aio_aws_batch.batch_cancel_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Cancel jobs that can be cancelled (if they are not running or complete).
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_get_logs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None, skip_existing: bool = False)[source]
Get job logs.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
skip_existing – skip jobs that already have logs; this is useful if some jobs already have complete log events captured
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_monitor_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Monitor submitted jobs until they complete.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_run_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Submit jobs that have not been submitted yet, and monitor all jobs until they complete.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_submit_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Submit jobs that have not been submitted yet.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_terminate_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Terminate jobs that can be killed (if they are not complete).
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_update_jobs(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]
Update job descriptions.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
- aio_aws.aio_aws_batch.batch_update_jobs_db(jobs_db: aio_aws.aio_aws_batch_db.AioAWSBatchDB)[source]
Update all the jobs-db job descriptions. This is a synchronous wrapper on
aio_batch_update_jobs()for all jobs in a jobs-db.- Parameters
jobs_db – a jobs-db to update
- Returns
the jobs-db is updated, so this function returns nothing
- aio_aws.aio_aws_batch.find_complete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any complete jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- aio_aws.aio_aws_batch.find_incomplete_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, reset_failed: bool = False)[source]
This can find any jobs that are not complete. It can also reset FAILED jobs that could be run again.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
reset_failed – if enabled, any FAILED jobs will be reset so they can be run again
- Returns
yield jobs found
- aio_aws.aio_aws_batch.find_jobs_by_status(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any jobs that match job states.
This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.
- Parameters
jobs – any AWSBatchJob
job_states – a list of valid job states
jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- aio_aws.aio_aws_batch.find_latest_jobs_with_jobs_db(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None) Generator[aio_aws.aws_batch_models.AWSBatchJob, None, None][source]
Find the latest jobs data, using the job itself or a fallback to check the latest data in the jobs-db.
This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.
- Parameters
jobs – any AWSBatchJob
jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- aio_aws.aio_aws_batch.find_running_jobs(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]
Find any running jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
- aio_aws.aio_aws_batch.get_logs_by_status(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: aio_aws.aio_aws_batch_db.AioAWSBatchDB, print_logs: bool = False)[source]
A utility function to get AWS Batch logs by job status, to update the logs in a jobs-db, with an option to print the logs.
- Parameters
jobs – any
AwsBatchJobto filter by statusjob_states – any AWS Batch job status
jobs_db – an instance of a
AioAwsBatchDbprint_logs – an option to print the logs
- Returns
None
- aio_aws.aio_aws_batch.job_for_status(job, job_states) Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
AioAWS Batch-DB
- class aio_aws.aio_aws_batch_db.AioAWSBatchDB[source]
Bases:
abc.ABCAbstract Base Class for AWS Batch job databases
- abstract async all_jobs() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()wherever possible.
- abstract async count_by_job_status() collections.Counter[source]
Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
- abstract async find_by_job_id(job_id: str) Optional[Dict][source]
Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the job data or None
- abstract async find_by_job_name(job_name: str) List[Dict][source]
Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of dictionaries containing job data
- abstract async find_by_job_status(job_states: List[str]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find any jobs matching any job status values
- Parameters
job_states – a list of valid job states
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
- abstract async find_job_logs(job_id: str) Optional[Dict][source]
Find job logs by job_id
- Parameters
job_id – str
- Returns
job logs document
- abstract async find_jobs_to_run() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find all jobs that have not SUCCEEDED.
- abstract async find_latest_job_name(job_name: str) Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
- abstract async gen_all_jobs() AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]
Generate all jobs.
- abstract async group_by_job_status() Dict[str, List[Tuple[str, str, str]]][source]
Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
- abstract async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], include_logs: bool = False) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records to recover job data.
- Parameters
jobs – a List[AWSBatchJob] to recover
- Param
include_logs: also recover any saved logs
- Returns
List[AWSBatchJob] with saved data for any saved jobs
- abstract async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()-AWSBatchDB.jobs_db.all()
- abstract async remove_by_job_id(job_id: str) Optional[Dict][source]
Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
- abstract async remove_by_job_name(job_name: str) Set[str][source]
Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
- abstract async save_job(job: aio_aws.aws_batch_models.AWSBatchJob) Optional[str][source]
Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
- abstract async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob) List[int][source]
Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a List[tinydb.database.Document.doc_id]
- class aio_aws.aio_aws_batch_db.AioAWSBatchRedisDB(redis_url: str = 'redis://127.0.0.1:6379', jobs_db_n: int = 2, logs_db_n: int = 4, max_connections: int = 1)[source]
Bases:
aio_aws.aio_aws_batch_db.AioAWSBatchDBAWS Batch job databases - aioredis implementation
The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.
- async all_jobs() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()wherever possible.
- async count_by_job_status() collections.Counter[source]
Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
- property db_alive: bool
- property db_info: Dict
- property db_semaphore: asyncio.locks.Semaphore
A semaphore to limit requests to the db
- async find_by_job_id(job_id: str) Optional[Dict][source]
Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the job data or None
- async find_by_job_name(job_name: str) List[Dict][source]
Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of dictionaries containing job data
- async find_by_job_status(job_states: List[str]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find any jobs matching any jobStatus values
- Parameters
job_states – a list of valid job status values
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
- async find_job_logs(job_id: str) Optional[Dict][source]
Find job logs by job_id
- Parameters
job_id – str
- Returns
job logs document
- async find_jobs_to_run() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find all jobs that have not SUCCEEDED.
- async find_latest_job_name(job_name: str) Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
- async gen_all_jobs() AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]
Generate all jobs.
- async group_by_job_status() Dict[str, List[Tuple[str, str, str]]][source]
Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
- property jobs_db
- property jobs_db_alive: Redis
- jobs_db_n: int = 2
- async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], include_logs: bool = False) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records to recover job data.
- Parameters
jobs – a List[AWSBatchJob] to recover
- Param
include_logs: also recover any saved logs
- Returns
List[AWSBatchJob] with saved data for any saved jobs
- async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()-AWSBatchDB.jobs_db.all()
- property logs_db
- property logs_db_alive: Redis
- logs_db_n: int = 4
- max_connections: int = 1
- redis_url: str = 'redis://127.0.0.1:6379'
- async remove_by_job_id(job_id: str) Optional[Dict][source]
Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
- async remove_by_job_name(job_name: str) Set[str][source]
Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
- async save_job(job: aio_aws.aws_batch_models.AWSBatchJob) Optional[str][source]
Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
- async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob) List[int][source]
Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a List[tinydb.database.Document.doc_id]
- class aio_aws.aio_aws_batch_db.AioAWSBatchTinyDB(jobs_db_file: str = '/tmp/aws_batch_jobs.json', logs_db_file: str = '/tmp/aws_batch_logs.json')[source]
Bases:
aio_aws.aio_aws_batch_db.AioAWSBatchDBAWS Batch job databases - TinyDB implementation
The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.
The batch data is a TinyDB json file, e.g.
>>> import json >>> jobs_db_file = '/tmp/aio_batch_jobs.json' >>> logs_db_file = '/tmp/aio_batch_logs.json' >>> with open(jobs_db_file) as job_file: ... batch_data = json.load(job_file) ... >>> len(batch_data['aws-batch-jobs']) 5 >>> import tinydb >>> tinydb.TinyDB.DEFAULT_TABLE = "aws-batch-jobs" >>> tinydb.TinyDB.DEFAULT_TABLE_KWARGS = {"cache_size": 0} >>> # the jobs and logs are kept in separate DBs so >>> # that performance on the jobs_db is not compromised >>> # by too much data from job logs. >>> jobs_db = tinydb.TinyDB(jobs_db_file) >>> logs_db = tinydb.TinyDB(logs_db_file)
- async all_jobs() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()wherever possible.
- async count_by_job_status() collections.Counter[source]
Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
- property db_semaphore: asyncio.locks.Semaphore
A semaphore to limit requests to the db
- async find_by_job_id(job_id: str) Optional[tinydb.database.Document][source]
Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the
AWSBatchJob.job_data()or None
- async find_by_job_name(job_name: str) List[tinydb.database.Document][source]
Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of documents containing
AWSBatchJob.job_data()
- async find_by_job_status(job_states: List[str]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find any jobs matching any jobStatus values
- Parameters
job_states – a list of valid job status values
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
- async find_job_logs(job_id: str) Optional[tinydb.database.Document][source]
Find job logs by job_id
- Parameters
job_id – str
- Returns
a tinydb.database.Document with job logs
- async find_jobs_to_run() List[aio_aws.aws_batch_models.AWSBatchJob][source]
Find all jobs that have not SUCCEEDED. Note that any jobs handled by the job-manager will not re-run if they have a job.job_id, those jobs will be monitored until complete.
- async find_latest_job_name(job_name: str) Optional[aio_aws.aws_batch_models.AWSBatchJob][source]
Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
- async gen_all_jobs() AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]
Generate all jobs.
- async group_by_job_status() Dict[str, List[Tuple[str, str, str]]][source]
Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
- jobs_db_file: str = '/tmp/aws_batch_jobs.json'
a file used for :py:class::TinyDB(jobs_db_file)
- async jobs_recovery(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], include_logs: bool = False) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records to recover job data.
- Parameters
jobs – a List[AWSBatchJob] to recover
- Param
include_logs: also recover any saved logs
- Returns
List[AWSBatchJob] with saved data for any saved jobs
- async jobs_to_run(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) List[aio_aws.aws_batch_models.AWSBatchJob][source]
Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()-AWSBatchDB.jobs_db.all()
- logs_db_file: str = '/tmp/aws_batch_logs.json'
a file used for :py:class::TinyDB(logs_db_file)
- async remove_by_job_id(job_id: str) Optional[tinydb.database.Document][source]
Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
- async remove_by_job_name(job_name: str) Set[str][source]
Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
- async save_job(job: aio_aws.aws_batch_models.AWSBatchJob) Optional[str][source]
Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
- async save_job_logs(job: aio_aws.aws_batch_models.AWSBatchJob) Optional[str][source]
Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a job.job_id if the logs are saved
AioAWS Batch Models
Batch Job metadata models.
- class aio_aws.aws_batch_models.AWSBatchJob(job_name: str, job_queue: str, job_definition: str, command: Optional[List[str]] = None, depends_on: Optional[List[Dict]] = None, container_overrides: Optional[Dict] = None, job_id: Optional[str] = None, status: Optional[str] = None, job_tries: Optional[List[str]] = None, num_tries: int = 0, max_tries: int = 4, job_submission: Optional[Dict] = None, job_description: Optional[Dict] = None, logs: Optional[List[Dict]] = None)[source]
Bases:
objectAWS Batch job
Creating an AWSBatchJob instance does not run anything, it’s simply a dataclass to retain and track job attributes. There are no instance methods to run a job, the instances are passed to async coroutine functions.
Replace ‘command’ with ‘container_overrides’ dict for more options; do not use ‘command’ together with ‘container_overrides’; if both are given, any valid ‘command’ value will replace the container_overrides[‘command’].
- Parameters
job_name – A job jobName (truncated to 128 characters).
job_definition – A job job_definition.
job_queue – A batch queue.
command – A container command.
depends_on –
list of dictionaries like:
[ {'jobId': 'abc123', ['type': 'N_TO_N' | 'SEQUENTIAL'] }, ]
type is optional, used only for job arrays
container_overrides –
a dictionary of container overrides. Overrides include ‘instanceType’, ‘environment’, and ‘resourceRequirements’.
If the command parameter is defined, it overrides container_overrides[‘command’]
To override VCPU and MEMORY requirements in the ResourceRequirement of the job definition, a ResourceRequirement must be specified in the SubmitJob request.
https://docs.aws.amazon.com/batch/latest/APIReference/API_ResourceRequirement.html
VCPU (int): This parameter maps to CpuShares in the Create a container section of the Docker Remote API and the –cpu-shares option to docker run. Each vCPU is equivalent to 1,024 CPU shares. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.
MEMORY (int): This parameter indicates the amount of memory (in MiB) that is reserved for the job. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.
container_overrides = { "resourceRequirements": [ { "type": "VCPU", "value": "2" }, { "type": "MEMORY", "value": str(int(gib_to_mib(6))) } ] }
max_tries – an optional limit to the number of job retries, which can apply in the job-manager function to any job with a SPOT failure only and it applies regardless of the job-definition settings.
- STATES = ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED']
- allow_submit_job() bool[source]
Logic to determine whether a job can be submitted or resubmitted:
jobs with an existing job.job_id should skip submission to avoid resubmission for the same job
jobs with too many tries cannot be resubmitted
The
AWSBatchJob.reset()can be applied in order to resubmit any job that is not allowed.- Returns
True if it is allowed
- command: List[str] = None
- container_overrides: Dict = None
- property created: Optional[int]
Timestamp (milliseconds since the epoch) for job createdAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was created. For non-array jobs and parent array jobs, this is when the job entered the SUBMITTED state (at the time SubmitJob was called). For array child jobs, this is when the child job was spawned by its parent and entered the PENDING state.
- property created_datetime: Optional[datetime.datetime]
Datetime for job createdAt; this depends on updates to the jobDescription.
- property db_data: Dict
AWS Batch job data for state machine persistence
- property db_logs_data: Optional[Dict]
AWS Batch job with logs persistence
- depends_on: List[Dict] = None
- property elapsed: Optional[int]
Timestamp (milliseconds since the epoch) for job elapsed time; this depends on updates to the jobDescription; it is the difference between createdAt and stoppedAt (this can include long periods in a job queue).
- job_definition: str
- job_description: Optional[Dict] = None
- job_id: Optional[str] = None
- job_name: str
- job_queue: str
- job_submission: Optional[Dict] = None
- job_tries: List[str] = None
- logs: Optional[List[Dict]] = None
- max_tries: int = 4
- num_tries: int = 0
- property params
AWS Batch parameters for job submission
- property runtime: Optional[int]
Timestamp (milliseconds since the epoch) for job run time; this depends on updates to the jobDescription; it is the difference between startedAt and stoppedAt.
- property spinup: Optional[int]
Timestamp (milliseconds since the epoch) for job startup time; this depends on updates to the jobDescription; it is the difference between createdAt and startedAt.
- property started: Optional[int]
Timestamp (milliseconds since the epoch) for job startedAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was started (when the job transitioned from the STARTING state to the RUNNING state). This parameter isn’t provided for child jobs of array jobs or multi-node parallel jobs.
- property started_datetime: Optional[datetime.datetime]
Datetime for job startedAt; this depends on updates to the jobDescription.
- status: Optional[str] = None
- property stopped: Optional[int]
Timestamp (milliseconds since the epoch) for job stoppedAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was stopped (when the job transitioned from the RUNNING state to a terminal state, such as SUCCEEDED or FAILED).
- property stopped_datetime: Optional[datetime.datetime]
Datetime for job stoppedAt; this depends on updates to the jobDescription.
- property submitted: Optional[int]
Timestamp (milliseconds since the epoch) for job submission
- property submitted_datetime: Optional[datetime.datetime]
Datetime for job submission; this depends on setting the job_submission.
- class aio_aws.aws_batch_models.AWSBatchJobDescription(jobName: str = None, jobId: str = None, jobQueue: str = None, status: str = None, attempts: List[Dict] = None, statusReason: str = None, createdAt: int = None, startedAt: int = None, stoppedAt: int = None, dependsOn: List[str] = None, jobDefinition: str = None, parameters: Dict = None, container: Dict = None, timeout: Dict = None)[source]
Bases:
object- attempts: List[Dict] = None
- container: Dict = None
- createdAt: int = None
- dependsOn: List[str] = None
- jobDefinition: str = None
- jobId: str = None
- jobName: str = None
- jobQueue: str = None
- parameters: Dict = None
- startedAt: int = None
- status: str = None
- statusReason: str = None
- stoppedAt: int = None
- timeout: Dict = None
- class aio_aws.aws_batch_models.AWSBatchJobStates(value)[source]
Bases:
enum.EnumAn enumeration.
- FAILED = 7
- PENDING = 2
- RUNNABLE = 3
- RUNNING = 5
- STARTING = 4
- SUBMITTED = 1
- SUCCEEDED = 6
- aio_aws.aws_batch_models.gb_to_gib(gb: Union[int, float]) float[source]
Gigabyte (Gb) to Gibibyte (GiB) :param gb: Gigabytes (Gb) :return: Gibibyte (GiB)
AioAWS Lambda
Manually create a ‘lambda_dev’ function with simple code like:
# lambda_dev/lambda_function.py
import json
def lambda_handler(event, context):
csv = ", ".join([str(i) for i in range(10)])
print(csv) # to log something
return {
'statusCode': 200,
'body': json.dumps(csv)
}
The aio_aws_lambda.py module has a main script to call this simple lambda function. This lambda function easily runs within a 128 Mb memory allocation and it runs in less time than the minimal billing period, so it’s as cheap as it can be and could fit within a monthly free quota.
$ ./aio_aws/aio_aws_lambda.py
Test async lambda functions
[INFO] 2020-04-07T03:10:21.741Z aio-aws:aio_lambda_invoke:114 AWS Lambda (lambda_dev) invoked OK
1 lambdas finished in 0.31 seconds.
$ N_LAMBDAS=1000 ./aio_aws/aio_aws_lambda.py
# snipped logging messages
1000 lambdas finished in 14.95 seconds.
- class aio_aws.aio_aws_lambda.AWSLambdaFunction(name: str, type: str = 'RequestResponse', log_type: str = 'None', context: Optional[str] = None, payload: bytes = b'', qualifier: Optional[str] = None, response: Optional[Dict] = None, data: Optional[bytes] = None)[source]
Bases:
objectAWS Lambda Function
Creating an AWSLambdaFunction instance does not create or invoke anything, it’s a dataclass to retain and track function parameters and response data.
- Parameters
name – A lambda FunctionName (truncated to 64 characters).
type – the type of function invocation (“RequestResponse”, “Event”, “DryRun”)
log_type – the type of function logging (“None” or “Tail”)
context – the client context; for an example of a ClientContext JSON, see PutEvents in the Amazon Mobile Analytics API Reference and User Guide. The ClientContext JSON must be base64-encoded and has a maximum size of 3583 bytes.
payload – an input payload (bytes or seekable file-like object); JSON that you want to provide to your Lambda function as input.
qualifier – an optional function version or alias name; If you specify a function version, the API uses the qualified function ARN to invoke a specific Lambda function. If you specify an alias name, the API uses the alias ARN to invoke the Lambda function version to which the alias points. If you don’t provide this parameter, then the API uses unqualified function ARN which results in invocation of the $LATEST version.
- LOG_TYPES = ['None', 'Tail']
- TYPES = ['RequestResponse', 'Event', 'DryRun']
- property content_length: Optional[int]
- property content_type: Optional[str]
- context: str = None
- data: bytes = None
- property error
This could be a lambda function error or a client error
- async invoke(config: aio_aws.aio_aws_config.AioAWSConfig, lambda_client: aiobotocore.client.AioBaseClient) aio_aws.aio_aws_lambda.AWSLambdaFunction[source]
Asynchronous coroutine to invoke a lambda function; this updates the
responseand calls the py:meth:.read_response method to handle the response.- Parameters
config – aio session and client settings
lambda_client – aio client for lambda
- Returns
a lambda response
- Raises
botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError
- property json
- log_type: str = 'None'
- property logs
- name: str
- property params
AWS Lambda parameters to invoke function
- payload: bytes = b''
- qualifier: str = None
- async read_response()[source]
Asynchronous coroutine to read a lambda response; this updates the
dataattribute.- Raises
botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError
- response: Dict = None
- property response_headers: Optional[Dict]
- property response_metadata: Optional[Dict]
- property status_code: Optional[int]
- property text
- type: str = 'RequestResponse'
- async aio_aws.aio_aws_lambda.run_lambda_function_thread_pool(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction], n_tasks: int = 4)[source]
- async aio_aws.aio_aws_lambda.run_lambda_functions(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction])[source]
Use some default config settings to run lambda functions
lambda_funcs = [] for i in range(5): event = {"i": i} payload = json.dumps(event).encode() func = AWSLambdaFunction(name="lambda_dev", payload=payload) lambda_funcs.append(func) asyncio.run(run_lambda_functions(lambda_funcs)) for func in lambda_funcs: assert response_success(func.response)
- Returns
it returns nothing, the lambda function will contain the results of any lambda response in func.response and func.data
AioAWS S3
Example code for async AWS S3 services. The aiobotocore library wraps a
release of botocore to patch it with features for async coroutines using
asyncio and aiohttp. The simple example code iterates on a list of
buckets or objects to issue a HEAD request, as a simple way to determine
whether access is permitted. The example compares an async approach (using
aiobotocore) vs a sequential blocking approach (using botocore). The async
approach uses a client connection limiter, based on asyncio.Semaphore(20).
$ ./aio_aws/aio_aws_s3.py
aio-aws check an s3 object that exists
GET HEAD s3://noaa-goes16/index.html -> {'ResponseMetadata': {'RequestId': 'A2F4DF282C84341B', 'HostId': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'x-amz-request-id': 'A2F4DF282C84341B', 'date': 'Tue, 03 Mar 2020 19:22:21 GMT', 'last-modified': 'Wed, 12 Feb 2020 19:14:27 GMT', 'etag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'content-encoding': 'text/html', 'accept-ranges': 'bytes', 'content-type': 'text/html', 'content-length': '32356', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'AcceptRanges': 'bytes', 'LastModified': datetime.datetime(2020, 2, 12, 19, 14, 27, tzinfo=tzutc()), 'ContentLength': 32356, 'ETag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'ContentEncoding': 'text/html', 'ContentType': 'text/html', 'Metadata': {}}
aio-aws check an s3 object that is missing; it raises.
GET HEAD s3://noaa-goes16/missing.html -> 404: object missing
aio-aws find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 2.42 seconds.
aio-aws collection of all objects in a bucket-prefix.
found 9015 s3 objects
finished in 4.43 seconds.
aws-sequential find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 31.11 seconds.
aws-sequential collection for all objects in a bucket-prefix
found 9015 s3 objects
finished in 35.72 seconds.
Checking equality of collections for aio-aws vs. aws-sync
The collections for aio-aws vs. aws-sync match OK
See also
- async aio_aws.aio_aws_s3.aio_s3_access(s3_uris: Iterable[str]) Dict[str, bool][source]
Asynchronous coroutine to issue HEAD requests on all available s3 objects to check if each s3_object allows access.
- Parameters
s3_uris – an iterable of s3 uris in the form ‘s3://bucket-name/key’
- Returns
dict of
{s3_uri: str, access: bool}
- async aio_aws.aio_aws_s3.aio_s3_bucket_access(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Tuple[str, bool][source]
Asynchronous coroutine to issue a HEAD request to check if s3 bucket access is allowed.
- Parameters
bucket_name – an s3 bucket name
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
- async aio_aws.aio_aws_s3.aio_s3_bucket_head(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Dict[source]
Asynchronous coroutine to issue a HEAD request for s3 bucket.
- Parameters
bucket_name – an s3 bucket name
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a HEAD request
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_s3.aio_s3_buckets_access(buckets: List[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Dict[str, bool][source]
Asynchronous coroutine to issue HEAD requests on all available s3 buckets to check if each bucket allows access.
- Parameters
buckets – a list of bucket names to check; the default is to issue a request for all buckets in the session region
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
dict of
{bucket_name: str, access: bool}
- async aio_aws.aio_aws_s3.aio_s3_buckets_list(config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Dict[source]
Asynchronous coroutine to list all buckets.
- Parameters
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a ListBuckets request
- async aio_aws.aio_aws_s3.aio_s3_object_access(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Tuple[str, bool][source]
Asynchronous coroutine to issue a HEAD request to check if s3 object access is allowed.
- Parameters
s3_uri – an s3 URI
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a tuple of
(s3_uri: str, access: bool)
- async aio_aws.aio_aws_s3.aio_s3_object_head(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Dict[source]
Asynchronous coroutine to issue a HEAD request for s3 object.
- Parameters
s3_uri – an s3 URI
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a HEAD request
- Raises
botocore.exceptions.ClientError
- async aio_aws.aio_aws_s3.aio_s3_objects_access(s3_uris: Iterable[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) Dict[str, bool][source]
Asynchronous coroutine to issue HEAD requests on all available s3 objects to check if each s3_object allows access.
- Parameters
s3_uris – a list of s3 uris in the form ‘s3://bucket-name/key’
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
dict of
{s3_uri: str, access: bool}
- async aio_aws.aio_aws_s3.aio_s3_objects_list(bucket_name: str, bucket_prefix: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) List[Dict][source]
Asynchronous coroutine to collect all objects in a bucket prefix.
- Parameters
bucket_name – param passed to s3_client.list_objects_v2 ‘Bucket’
bucket_prefix – param passed to s3_client.list_objects_v2 ‘Prefix’
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a list of s3 object data, e.g.
>>> aio_s3_objects[0] {'ETag': '"192e29f360ea8297b5876b33b8419741"', 'Key': 'ABI-L2-ADPC/2019/337/13/OR_ABI-L2-ADPC-M6_G16_s20193371331167_e20193371333539_c20193371334564.nc', 'LastModified': datetime.datetime(2019, 12, 3, 14, 27, 5, tzinfo=tzutc()), 'Size': 892913, 'StorageClass': 'INTELLIGENT_TIERING'}
- aio_aws.aio_aws_s3.aws_s3_bucket_access(bucket_name: str, s3_client: botocore.client.BaseClient) bool[source]
Check access to an S3 bucket by issuing a HEAD request
- Parameters
bucket_name – An s3 bucket name
s3_client – botocore.client.BaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
- aio_aws.aio_aws_s3.aws_s3_buckets_access(s3_client: botocore.client.BaseClient) Dict[str, bool][source]
A concurrent.futures approach to list all s3 buckets and issue a HEAD request to check if access is allowed. This is used in performance comparisons between botocore and aiobotocore functionality.
- Parameters
s3_client – botocore.client.BaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
- aio_aws.aio_aws_s3.validate_s3_files(s3_uris: Iterable[str]) Dict[str, bool][source]
Synchronous wrapper on
aio_s3_access(), the coroutines that issue HEAD requests on all available s3 objects to check if each s3_object allows access. This usesasyncio.run().- Parameters
s3_uris – an iterable of s3 uris in the form ‘s3://bucket-name/key’
- Returns
dict of
{s3_uri: str, access: bool}
S3 AIO
These functions provide non-blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.
- async aio_aws.s3_aio.geojson_s3_dump(geojson_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Optional[str][source]
Write GeoJSON to an s3 URI
- Parameters
geojson_data – an object to json.dump
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
- async aio_aws.s3_aio.geojson_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Dict[source]
Read GeoJSON data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
geojson data
- async aio_aws.s3_aio.geojsons_dump(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str]) Optional[Union[pathlib.Path, str]][source]
Write a GeoJSON Text Sequence file
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
geojsons_file – a file path to write
- Returns
if the dump succeeds, return the geojsons_file, or None
- async aio_aws.s3_aio.geojsons_s3_dump(geojson_features: List[Dict], s3uri: str, s3_client: aiobotocore.client.AioBaseClient) Optional[str][source]
Write GeoJSON Text Sequence files to an s3 URI
[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:
import geopandas as gpd s3_uri = "s3://your-bucket/prefix/input.geojsons" gdf = gpd.read_file(s3_uri)
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
s3uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
- async aio_aws.s3_aio.geojsons_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) List[Dict][source]
Read GeoJSON Text Sequence data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
geojson features
See also
- async aio_aws.s3_aio.get_s3_content(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)[source]
Read an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the data from the s3 object
- async aio_aws.s3_aio.json_dump(data: Any, file: Union[pathlib.Path, str]) Optional[Union[pathlib.Path, str]][source]
Write JSON to a file
- Parameters
data – any data compatible with json.dumps
file – a file path to write
- Returns
if the dump succeeds, return the file, or None
- async aio_aws.s3_aio.json_s3_dump(json_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Optional[str][source]
Write JSON to an s3 URI
- Parameters
json_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
- async aio_aws.s3_aio.json_s3_load(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Any[source]
Read JSON data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
data from the json load
- async aio_aws.s3_aio.put_s3_content(data_file: str, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Optional[str][source]
Write a file to an s3 object
- Parameters
data_file – a data file
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_aio.run_s3_load_files(s3_uris: List[str], *args, **kwargs) Dict[str, Optional[Any]][source]
Collect data from S3 files in JSON or YAML formats
- Parameters
s3_uris – a list of S3 URIs
- Returns
a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully
- aio_aws.s3_aio.s3_aio_client(*args, **kwargs) aiobotocore.client.AioBaseClient[source]
This creates a aiobotocore.client.AioBaseClient that uses aiohttp for asynchronous s3 requests
- Returns
a aiobotocore.client.AioBaseClient for s3
- async aio_aws.s3_aio.s3_file_info(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: aiobotocore.client.AioBaseClient) aio_aws.s3_uri.S3Info[source]
Collect data from an S3 HEAD request for an S3URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
- async aio_aws.s3_aio.s3_files_info(s3_uris: List[Union[aio_aws.s3_uri.S3URI, str]], s3_client: aiobotocore.client.AioBaseClient) List[aio_aws.s3_uri.S3Info][source]
Collect data from S3 HEAD requests for many S3URI
- Parameters
s3_uris – a list of S3URI
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
a list of S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
- async aio_aws.s3_aio.s3_load_file(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Tuple[str, Optional[Any]][source]
Load various file types from s3; it supports files with a known file suffix, such as “.json”, “.geojson”, “.geojsons”, “.yaml”, “.yml”
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
a Tuple[s3_uri, Optional[s3_data]] where the s3_data could be None if the file type is not recognized or a file read fails
- async aio_aws.s3_aio.s3_load_files(s3_uris: List[str], *args, s3_client: Optional[aiobotocore.client.AioBaseClient] = None, **kwargs) Dict[str, Optional[Any]][source]
Collect data from S3 files in JSON or YAML formats
- Parameters
s3_uris – a list of S3 URIs
s3_client – an optional aiobotocore.client.AioBaseClient for s3
- Returns
a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully
- async aio_aws.s3_aio.yaml_dump(data: Any, file: Union[pathlib.Path, str]) Optional[Union[pathlib.Path, str]][source]
Write YAML to a file
- Parameters
data – any data compatible with yaml.safe_dump
file – a file path to write
- Returns
if the dump succeeds, return the file, or None
- async aio_aws.s3_aio.yaml_s3_dump(yaml_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) Optional[str][source]
Write YAML to an s3 URI
- Parameters
yaml_data – an object to yaml.dump
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
S3 IO
These functions provide blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.
- class aio_aws.s3_io.JsonBaseModel[source]
Bases:
pydantic.main.BaseModel- json_dump(json_file: Union[str, pathlib.Path]) Optional[pathlib.Path][source]
Dump model dict to JSON file
- json_s3_dump(s3_uri: Union[aio_aws.s3_uri.S3URI, str], *args, **kwargs) Optional[aio_aws.s3_uri.S3URI][source]
Dump model dict to JSON file at S3 URI
- classmethod json_s3_load(s3_uri: Union[aio_aws.s3_uri.S3URI, str], *args, **kwargs)[source]
Load model from JSON file at S3 URI
- class aio_aws.s3_io.YamlBaseModel[source]
Bases:
aio_aws.s3_io.JsonBaseModel- classmethod load(config_file: Union[pathlib.Path, aio_aws.s3_uri.S3URI, str])[source]
- classmethod load_s3(s3_uri: Union[aio_aws.s3_uri.S3URI, str])[source]
- yaml_dump(yaml_file: Union[str, pathlib.Path]) Optional[pathlib.Path][source]
Dump model dict to YAML file
- yaml_s3_dump(s3_uri: Union[aio_aws.s3_uri.S3URI, str]) Optional[aio_aws.s3_uri.S3URI][source]
Dump model dict to YAML file at S3 URI
- classmethod yaml_s3_load(s3_uri: Union[aio_aws.s3_uri.S3URI, str])[source]
Load model from YAML file at S3 URI
- aio_aws.s3_io.geojson_s3_dump(geojson_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) Optional[str][source]
Write GeoJSON to s3 URI
- Parameters
geojson_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_io.geojson_s3_load(s3_uri: str, *args, s3_client: Optional[botocore.client.BaseClient] = None, **kwargs) Dict[source]
Read GeoJSON data from s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
geojson data
- aio_aws.s3_io.geojsons_dump(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str]) Optional[Union[pathlib.Path, str]][source]
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
geojsons_file – a file path to write
- Returns
if the dump succeeds, return the geojsons_file, or None
- aio_aws.s3_io.geojsons_s3_dump(geojson_features: List[Dict], s3uri: str, s3_client: Optional[botocore.client.BaseClient] = None) Optional[str][source]
Write GeoJSON Text Sequence files to s3 URI
[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:
import geopandas as gpd s3_uri = "s3://your-bucket/prefix/input.geojsons" gdf = gpd.read_file(s3_uri)
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
s3uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_io.geojsons_s3_load(s3_uri: str, *args, s3_client: Optional[botocore.client.BaseClient] = None, **kwargs) List[Dict][source]
Read GeoJSON Text Sequence data from s3 object
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
geojson features
See also
- aio_aws.s3_io.get_s3_content(s3_uri: str, *args, s3_client: Optional[botocore.client.BaseClient] = None, **kwargs)[source]
Read s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the data from the s3 object
- aio_aws.s3_io.json_s3_dump(json_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) Optional[str][source]
Write JSON to s3 URI
- Parameters
json_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_io.json_s3_load(s3_uri: str, *args, s3_client: Optional[botocore.client.BaseClient] = None, **kwargs) Any[source]
Load JSON from s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
data from the json load
- aio_aws.s3_io.put_s3_content(data_file: str, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) Optional[str][source]
Write a file to s3 URI
- Parameters
data_file – a data file
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_io.s3_file_info(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: Optional[botocore.client.BaseClient] = None) aio_aws.s3_uri.S3Info[source]
Collect data from an S3 HEAD request for an S3URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
- aio_aws.s3_io.s3_file_wait(s3_uri: Union[aio_aws.s3_uri.S3URI, str], delay: float = 2, max_attempts: int = 30, s3_client: Optional[botocore.client.BaseClient] = None) Optional[aio_aws.s3_uri.S3URI][source]
Wait for s3 object to exist.
- Parameters
s3_uri – s3 object URI
delay – time between polling the object existence
max_attempts – number of times to poll object existence
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the S3URI, if it exists
- Raises
FileNotFoundError – if it does not exist
botocore.exceptions.WaiterError – for waiting errors or when an object does not exist
botocore.exceptions.ClientError – other client errors
- aio_aws.s3_io.s3_files_info(s3_uris: Iterable[Union[aio_aws.s3_uri.S3URI, str]], s3_client: Optional[botocore.client.BaseClient] = None) List[aio_aws.s3_uri.S3Info][source]
Collect data from an S3 HEAD request for an S3URI
- Parameters
s3_uris – an iterable of fully qualified S3 URI for s3 objects
s3_client – an optional botocore.client.BaseClient for s3
- Returns
an S3Info object with HEAD data for each s3 object that exists; on failure the S3Info object has no HEAD data
- aio_aws.s3_io.s3_io_client(*args, **kwargs) botocore.client.BaseClient[source]
This creates a botocore.client.BaseClient that uses urllib3, which should be thread-safe, with a default connection pool.
This uses a botocore.config.Config with 3 retries using a standard back off method
- Returns
a botocore.client.BaseClient for s3
- aio_aws.s3_io.yaml_s3_dump(yaml_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) Optional[str][source]
Write YAML to s3 URI
- Parameters
yaml_data – an object to yaml.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
- aio_aws.s3_io.yaml_s3_load(s3_uri: str, *args, s3_client: Optional[botocore.client.BaseClient] = None, **kwargs) Any[source]
Load YAML from s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
data from the yaml load
S3URI
- class aio_aws.s3_uri.S3Info(s3_uri: aio_aws.s3_uri.S3URI, s3_size: Optional[int] = None, last_modified: Optional[datetime.datetime] = None)[source]
Bases:
objectA mutable value object for s3 object information. This object is used with the results of HEAD requests for s3 URIs.
This class is mutable and may not satisfy requirements for a hashable object.
from aio_aws.s3_uri import S3Info s3_uri = s3_uri=S3URI("s3://bucket/key") s3_info = S3Info(s3_uri=s3_uri) s3_head = s3_client.head_object(Bucket=s3_uri.bucket, Key=s3_uri.key) if response_success(s3_head): s3_info.last_modified = s3_head["LastModified"] s3_info.s3_size = int(s3_head["ContentLength"])
- property dict: Dict
- Returns
a dict representation with primary data types, which is compatible with JSON serialization
- property iso8601: Optional[str]
- Returns
datetime.isoformat(), if possible
- property json: str
- Returns
a JSON representation
- last_modified: datetime.datetime = None
- s3_size: int = None
- s3_uri: aio_aws.s3_uri.S3URI
- class aio_aws.s3_uri.S3Object(bucket: str, key: str)[source]
Bases:
objectAn immutable value object for the bucket_name and key in an
s3.ObjectSummary.This frozen dataclass should work around problems with
Picklefor ans3.ObjectSummary. This frozen dataclass should work with data structures that require a hashable object.from aio_aws.s3_uri import S3Object # obj is s3.ObjectSummary S3Object(bucket=obj.bucket_name, key=obj.key)
- bucket: str
- key: str
- property s3_uri: str
s3_uri: str for
s3://{bucket}/{key}where the{bucket}/{key}are forced into a posix path
- class aio_aws.s3_uri.S3Parts(bucket: str, key: str, PROTOCOL: str = 's3://', DELIMITER: str = '/')[source]
Bases:
objectA frozen value object for s3 parts: a bucket and key. This class uses a default ‘s3://’ protocol and a default ‘/’ delimiter.
This frozen dataclass should work around problems with
Pickleor data structures that require a hashable object.from aio_aws.s3_uri import S3Parts # assume obj is s3.ObjectSummary S3Parts(bucket=obj.bucket_name, key=obj.key) # Parse any s3 URI S3Parts.parse_s3_uri("s3://bucket/key")
- DELIMITER: str = '/'
- PROTOCOL: str = 's3://'
- bucket: str
- key: str
- static parse_s3_uri(uri: str) aio_aws.s3_uri.S3Parts[source]
- property s3_uri
- class aio_aws.s3_uri.S3Paths(s3_uri: str = '', bucket: str = '', key: str = '')[source]
Bases:
objectS3 URI components
- property bucket: str
- glob_file_pattern()[source]
Construct a glob pattern relative to the key_path. The final glob pattern is:
{key_path}/**/{file_stem}*.*- Returns
str
- glob_pattern(glob_pattern: str = '**/*')[source]
Construct a glob pattern relative to the key_path. The final glob pattern is:
str(PurePosixPath(self.key_path) / glob_pattern)- Parameters
glob_pattern – str defaults to
'**/*'- Returns
str
- property key: str
- property key_file: str
- property key_path: str
- static parse_s3_uri(s3_uri: str) aio_aws.s3_uri.S3Paths[source]
Parse an S3 URI into components; the delimiter must be ‘/’
- Parameters
s3_uri –
- Returns
S3Paths(bucket, key, key_path, key_file)
- property protocol: str
- property s3_uri
s3_uri: str for
s3://{bucket}/{key}
- class aio_aws.s3_uri.S3URI(s3_uri: str = '', bucket: str = '', key: str = '')[source]
Bases:
aio_aws.s3_uri.S3PathsS3 URI representation and parsing.
At present, this class is designed to represent one s3 object. Although it has some utilities for working with general bucket and key paths, it is designed to work with one s3 object.
For technical details on S3 Objects, refer to
https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
The following are the rules for naming S3 buckets in all AWS Regions:
Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.
Bucket names must be unique across all existing bucket names in Amazon S3.
Bucket names must comply with DNS naming conventions.
Bucket names must be at least 3 and no more than 63 characters long.
Bucket names must not contain uppercase characters or underscores.
Bucket names must start with a lowercase letter or number.
Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.
Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).
When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.
The following are the rules for naming S3 keys:
The name for a key is a sequence of Unicode characters whose UTF-8 encoding is at most 1024 bytes long.
- Parameters
bucket – str for bucket in
s3://{bucket}/{key}key – str for key in
s3://{bucket}/{key}
- static parse_s3_uri(s3_uri: str) aio_aws.s3_uri.S3URI[source]
Parse an S3 URI into components; the delimiter must be ‘/’
- Parameters
s3_uri –
- Returns
S3Paths(bucket, key, key_path, key_file)
- s3_derivatives() Iterable[source]
Use
s3_objects(glob_file_pattern)to find all the derivatives with matching file names (key_file*.*) below the path of this file.- Returns
Iterable[s3.ObjectSummary]
- s3_exists() bool[source]
Check the s3 file summary to confirm a file exists. :return: bool True if the s3 file exists
- s3_head_request() dict[source]
Issue an HTTP HEAD request to get the s3 URI summary data.
- Returns
dict of HEAD response data or an empty dict if it fails.
- s3_object_summary() s3.ObjectSummary[source]
Create an s3.ObjectSummary for this S3URI; this does not check if the object exists.
- Returns
s3.ObjectSummary
- s3_objects(glob_pattern: Optional[str] = None, prefix: Optional[str] = None) Iterable[source]
Search all the s3 objects at or below the s3_uri.bucket, using an optional filter prefix and/or a glob pattern (defaults to everything).
Using the s3_uri.key_path as a filter prefix is most often critical to limit the number of bucket objects to process. The addition of a glob pattern is applied to the results of all objects found; the glob pattern match is applied to each s3.ObjectSummary.key.
- Parameters
prefix – str to specify a Prefix filter based on glob patterns
glob_pattern – str to specify a filter based on glob patterns
- Returns
Iterable[s3.ObjectSummary]
- aio_aws.s3_uri.bucket_validate(bucket_name: str) bool[source]
For technical details on S3 buckets, refer to
The following are the rules for naming S3 buckets in all AWS Regions:
Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.
Bucket names must be unique across all existing bucket names in Amazon S3.
Bucket names must comply with DNS naming conventions.
Bucket names must be at least 3 and no more than 63 characters long.
Bucket names must not contain uppercase characters or underscores.
Bucket names must start with a lowercase letter or number.
Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.
Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).
When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.
- Parameters
bucket_name –
- Returns
True if the bucket_name is valid
- Raises
ValueError if the bucket_name is not valid
- aio_aws.utils.datetime_to_http_date(dt: datetime.datetime) str[source]
Parse a datetime into an HTTP date string (using GMT).
- Parameters
dt – a datetime
- Returns
an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- aio_aws.utils.datetime_to_unix_milliseconds(dt: datetime.datetime) int[source]
datetime to unix timestamp - time since the epoch in milliseconds
- aio_aws.utils.handle_head_error_code(error: botocore.exceptions.ClientError, item: Optional[str] = None) Optional[bool][source]
- aio_aws.utils.http_date_to_datetime(http_date: str) datetime.datetime[source]
Parse a HTTP date string into a datetime.
- Parameters
http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- Returns
a datetime
- aio_aws.utils.http_date_to_timestamp(http_date: str) float[source]
Parse a HTTP date string into a timestamp as seconds since the epoch.
- Parameters
http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- Returns
a timestamp (seconds since the epoch)
- aio_aws.utils.timestamp_to_http_date(ts: float) str[source]
Parse a timestamp (seconds since the epoch) into an HTTP date string (using GMT).
- Parameters
ts – a timestamp (seconds since the epoch)
- Returns
an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- aio_aws.utils.utc_unix_milliseconds() int[source]
Unix timestamp - time since the epoch in milliseconds
- aio_aws.uuid_utils.generate_hex_uuids(n_uuids: int) Iterator[str][source]
Generate an iterator of UUIDs
- aio_aws.uuid_utils.generate_uuids(n_uuids: int) Iterator[str][source]
Generate an iterator of UUIDs