AioAws¶
AioAWS Settings¶
Using asyncio for AWS services requires the aiobotocore library, which wraps a
release of botocore to patch it with features for async coroutines using
asyncio and aiohttp. To avoid issuing too many concurrent requests (DOS attack),
the async approach should use a client connection limiter, based on asyncio.Semaphore()
.
It’s recommended to use a single session and a single client with a connection pool.
Although there are context manager patterns, it’s also possible to manage closing the client
after everything is done.
See also
-
class
aio_aws.aio_aws_config.
AioAWSConfig
(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 10, sem: int = 10, session: aiobotocore.session.AioSession = None)[source]¶ Bases:
object
-
aws_region
: str = None¶ an optional AWS region name
-
create_client
(service: str, *args, **kwargs) → aiobotocore.client.AioBaseClient[source]¶ Create and yield a new client using the
AioAWSConfig.session
; the clients are configured with a default limit on the size of the connection pool defined byAioAWSConfig.max_pool_connections
config = AioAWSConfig() async with config.create_client("s3") as client: response = await s3_client.head_bucket(Bucket=bucket_name)
It is possible to pass through additional args and kwargs including config=botocore.client.Config.
- Yield
an aiobotocore.client.AioBaseClient for AWS service
-
property
default_client_config
¶
-
max_jitter
: float = 10¶ an asyncio.sleep for
random.uniform(min_jitter, max_jitter)
-
max_pause
: float = 30¶ an asyncio.sleep for
random.uniform(min_pause, max_pause)
-
max_pool_connections
: int = 10¶ defines a limit to the number of client connections
-
min_jitter
: float = 1¶ an asyncio.sleep for
random.uniform(min_jitter, max_jitter)
-
min_pause
: float = 5¶ an asyncio.sleep for
random.uniform(min_pause, max_pause)
-
retries
: int = 4¶ a number of retries for an AWS client request/response
-
sem
: int = 10¶
-
property
semaphore
¶ An asyncio.Semaphore to limit the number of concurrent clients; this defaults to the session client connection pool
-
session
: aiobotocore.session.AioSession = None¶ an aiobotocore.session.AioSession
-
-
aio_aws.aio_aws_config.
MAX_JITTER
: float = 10¶ Maximum API request jitter
-
aio_aws.aio_aws_config.
MAX_PAUSE
: float = 30¶ Maximum task pause
-
aio_aws.aio_aws_config.
MAX_POOL_CONNECTIONS
= 10¶ max_pool_connections for AWS clients (10 by default)
-
aio_aws.aio_aws_config.
MIN_JITTER
: float = 1¶ Minimum API request jitter
-
aio_aws.aio_aws_config.
MIN_PAUSE
: float = 5¶ Minimum task pause
-
aio_aws.aio_aws_config.
RETRY_EXCEPTIONS
= ['TooManyRequestsException', 'ThrottlingException']¶ Common exception codes that are retried
-
aio_aws.aio_aws_config.
aio_aws_client
(service_name: str, *args, **kwargs) → aiobotocore.client.AioBaseClient[source]¶ Yield an asyncio AWS client with an option to provide a client-specific config; this is a thin wrapper on
aiobotocore.get_session().create_client()
and the additional kwargs as passed through tosession.create_client(**kwargs)
.It is possible to pass through additional args and kwargs including config: aiobotocore.config.AioConfig (the default is
aio_aws_default_config()
)s3_endpoint = "http://localhost:5555" client_config = botocore.client.Config( read_timeout=120, max_pool_connections=50, ) async with aio_aws_client( "s3", endpoint_url=s3_endpoint, config=client_config ) as client: assert "aiobotocore.client.S3" in str(client) assert isinstance(client, aiobotocore.client.AioBaseClient)
- Parameters
service_name – an AWS service for a client, like “s3”, try
session.get_available_services()
- Yield
aiobotocore.client.AioBaseClient
-
aio_aws.aio_aws_config.
aio_aws_default_config
() → aiobotocore.config.AioConfig[source]¶ Get a default asyncio AWS config using a default py:const:MAX_POOL_CONNECTIONS
- Returns
aiobotocore.config.AioConfig
-
aio_aws.aio_aws_config.
aio_aws_default_session
() → aiobotocore.session.AioSession[source]¶ Get a default asyncio AWS session with a default config from
aio_aws_default_config()
- Returns
aiobotocore.session.AioSession
-
aio_aws.aio_aws_config.
aio_aws_session
(aio_aws_config: Optional[aiobotocore.config.AioConfig] = None) → aiobotocore.session.AioSession[source]¶ Get an asyncio AWS session with an ‘aio-aws’ user agent name
- Parameters
aio_aws_config – an aiobotocore.config.AioConfig (default
aio_aws_default_config()
)- Returns
aiobotocore.session.AioSession
-
aio_aws.aio_aws_config.
asyncio_default_semaphore
() → asyncio.locks.Semaphore[source]¶ a default semaphore to limit creation of clients; it defaults to 2 * py:const:MAX_POOL_CONNECTIONS
- Returns
aiobotocore.config.AioConfig
-
async
aio_aws.aio_aws_config.
delay
(task_id: str, min_pause: float = 5, max_pause: float = 30) → float[source]¶
-
async
aio_aws.aio_aws_config.
jitter
(task_id: str = 'jitter', min_jitter: float = 1, max_jitter: float = 10) → float[source]¶ Await a random pause between min_jitter and max_jitter
- Parameters
task_id – an optional ID for the task awaiting this jitter
min_jitter – defaults to
MIN_JITTER
max_jitter – defaults to
MAX_JITTER
- Returns
random interval for pause
AioAWS Batch¶
In testing this, it’s able to run and monitor 100s of jobs from a laptop with modest CPU/RAM resources. It can recover from a crash by using a db-state, without re-running the same jobs (by jobName). It should be able to scale to run 1000s of jobs.
To run the example, the aio_aws.aio_aws_batch
module has a main
that will run and
manage about 5 live batch jobs (very small sleep
jobs that don’t cost much to run). The
job state is persisted to aws_batch_jobs.json
and if it runs successfully, it will not
run the jobs again; the TinyDB is used to recover job state by jobName
. (This demo
assumes that some simple AWS Batch infrastructure exists already.)
# setup the python virtualenv
# check the main details and modify for a preferred batch queue/CE and AWS region
$ ./aio_aws/aio_aws_batch.py
Test async batch jobs
# wait a few minutes and watch the status messages
# it submits and monitors the jobs until they complete
# job status is saved and updated in `aws_batch_jobs.json`
# when it's done, run it again and see that nothing is re-submitted
$ ./aio_aws/aio_aws_batch.py
If the job monitoring is halted for some reason (like CNT-C
), it can recover from
the db-state, e.g.
$ ./aio_aws/aio_aws_batch.py
Test async batch jobs
[INFO] 2020-03-05T14:51:53.372Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0000) recovered from db
[INFO] 2020-03-05T14:51:53.373Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0001) recovered from db
[INFO] 2020-03-05T14:51:53.373Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0002) recovered from db
[INFO] 2020-03-05T14:51:53.374Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0003) recovered from db
[INFO] 2020-03-05T14:51:53.374Z aio-aws:<module>:485 AWS Batch job (test-sleep-job-0004) recovered from db
[INFO] 2020-03-05T14:51:53.690Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (846d54d4-c3c3-4a3b-9101-646d78d3bbfb) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.692Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (dfce3461-9eab-4f5b-846c-6f223d593f6f) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.693Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (637e6b27-8d4d-4f45-b988-c00775461616) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.701Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (d9ac27c9-e7d3-49cd-8f53-c84a9b4c1750) status: RUNNABLE
[INFO] 2020-03-05T14:51:53.732Z aio-aws:aio_batch_job_waiter:375 AWS Batch job (7ebfe7c4-44a4-40d6-9eab-3708e334689d) status: RUNNABLE
For the demo to run quickly, most of the module settings are fit for fast jobs. For much longer running jobs, there are functions that only submit jobs or check jobs and the settings should be changed for monitoring jobs to only check every 10 or 20 minutes.
Quick Start¶
from aio_aws.aws_batch_models import AWSBatchJob
from aio_aws.aio_aws_batch import batch_monitor_jobs
from aio_aws.aio_aws_batch import batch_submit_jobs
job1 = AWSBatchJob(
job_name="sleep-1-job",
job_definition=your_job_definition_arn,
job_queue=your_job_queue_arn,
command=["/bin/sh", "-c", "echo Hello && sleep 1 && echo Bye"],
)
job2 = AWSBatchJob(
job_name="sleep-2-job",
job_definition=your_job_definition_arn,
job_queue=your_job_queue_arn,
command=["/bin/sh", "-c", "echo Hello && sleep 2 && echo Bye"],
)
batch_submit_jobs(jobs=jobs)
batch_monitor_jobs(jobs=jobs)
This example uses high level synchronous wrappers to make it easy get started. There are a lot of details hidden in these convenient wrappers, but all the code is easily read in aio_aws.aio_aws_batch. For advanced use, please read the source code, especially the unit tests in the code repo. There is very little time to maintain the documentation.
Monitoring Jobs¶
The aio_aws.aio_aws_batch.aio_batch_job_manager()
can submit a job, wait
for it to complete and retry if it fails on a SPOT termination. It saves the job status
using the aio_aws.aio_aws_batch import AWSBatchDB
. The job manager
uses aio_aws.aio_aws_batch.aio_batch_job_waiter()
, which uses these settings
to control the async-wait between polling the job status:
These settings control how often job descriptions are polled. These requests for job status are also limited by the client connection pool and the client semaphore used by the job manager. Since AWS Batch has API limits on the number of requests for job status, it’s best to use a client connection pool and semaphore of about 10 connections. Any failures to poll for a job status will be retried a few times (using some random jitter on retry rates).
To modify the polling frequency settings, use a custom config. For example, the unit test suite uses much faster polling on mock batch jobs to speed up the unit tests; e.g.
config = AWSBatchConfig(
start_pause=0.4, min_pause=0.8, max_pause=1.0, min_jitter=0.1, max_jitter=0.2,
)
See also
-
class
aio_aws.aio_aws_batch.
AWSBatchConfig
(aws_region: str = None, retries: int = 4, min_pause: float = 5, max_pause: float = 30, min_jitter: float = 1, max_jitter: float = 10, max_pool_connections: int = 10, sem: int = 10, session: aiobotocore.session.AioSession = None, start_pause: float = 30, aio_batch_db: Union[aio_aws.aio_aws_batch_db.AioAWSBatchDB, NoneType] = None)[source]¶ Bases:
aio_aws.aio_aws_config.AioAWSConfig
-
aio_batch_db
: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None¶ an optional AioAWSBatchDB
-
create_batch_client
() → aiobotocore.client.AioBaseClient[source]¶ Create and yield an AWS Batch client using the
AWSBatchConfig.session
- Yield
an aiobotocore.client.AioBaseClient for AWS Batch
-
create_logs_client
() → aiobotocore.client.AioBaseClient[source]¶ Create and yield an AWS CloudWatchLogs client using the
AWSBatchConfig.session
- Yield
an aiobotocore.client.AioBaseClient for AWS CloudWatchLogs
-
start_pause
: float = 30¶ a batch job startup pause,
random.uniform(start_pause, start_pause * 2)
; this applies when the job status is in [“SUBMITTED”, “PENDING”, “RUNNABLE”]
-
-
aio_aws.aio_aws_batch.
BATCH_STARTUP_PAUSE
: float = 30¶ batch job startup pause (seconds)
-
async
aio_aws.aio_aws_batch.
aio_batch_cancel_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Cancel jobs that can be cancelled (if they are not running or complete).
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_describe_jobs
(job_ids: List[str], config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → Optional[Dict][source]¶ Asynchronous coroutine to issue a batch job description request that retrieves status information for up to 100 jobId
- Parameters
job_ids – a list of batch jobId
config – settings for task pauses between retries
- Returns
a describe_jobs response
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_get_logs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]¶ Get job logs. The logs should be updated in any configured jobs-db.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_job_cancel
(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User cancelled job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → aio_aws.aws_batch_models.AWSBatchJob[source]¶ Asynchronous coroutine to cancel a job i a batch job queue. Jobs that are in the SUBMITTED, PENDING, or RUNNABLE state are canceled. Jobs that have progressed to STARTING or RUNNING are not canceled, but the API operation still succeeds, even if no job is canceled. These jobs must be terminated.
- Parameters
job – an AWSBatchJob
reason – a reason to cancel the job
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob
- Raises
botocore.exceptions.ClientError
- Raises
RetryError if it exceeds retries
-
async
aio_aws.aio_aws_batch.
aio_batch_job_logs
(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → Optional[List[Dict]][source]¶ Asynchronous coroutine to get logs for a batch job log stream. All the events available for a log stream are collected and returned. The AWS Cloud Watch log streams can have delays, don’t expect near real-time values in these data.
- Parameters
job – A set of job parameters
config – settings for task pauses between retries
- Returns
a list of all the log events from get_log_events responses
- Raises
botocore.exceptions.ClientError
- Raises
RetryError if it exceeds retries
-
async
aio_aws.aio_aws_batch.
aio_batch_job_manager
(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → aio_aws.aws_batch_models.AWSBatchJob[source]¶ Asynchronous coroutine to manage a batch job. The job-manager will update attributes on the job as the job passes through submission and completion. On completion, the job logs are retrieved. As the job status and logs are collected, the job data is persisted to the jobs_db.
Note that any job with a job.job_id will not re-run, those jobs will be monitored until complete. To re-run a job that has already run, first call the job.reset() method to clear any previous job state. (Any previous attempts are recorded in job.job_tries and in the job.job_description.)
- Parameters
job – a batch job spec
config – settings for task pauses between retries
- Returns
a describe_jobs response for job_id when it’s complete
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_job_status
(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → aio_aws.aws_batch_models.AWSBatchJob[source]¶ Asynchronous coroutine to update a single job description. The job data is updated (it’s saved to an optional jobs-db).
Use
aio_batch_update_jobs()
for more efficient updates to many jobs.- Parameters
job – a batch job
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_job_submit
(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → aio_aws.aws_batch_models.AWSBatchJob[source]¶ Submit a batch job; for a successful submission, the jobId and submission details are updated on the job object (and an optional jobs-db).
- Parameters
job – A set of job parameters
config – settings for task pauses between retries
- Returns
an updated AWSBatchJob (modified in-place)
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_job_terminate
(job: aio_aws.aws_batch_models.AWSBatchJob, reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → aio_aws.aws_batch_models.AWSBatchJob[source]¶ Asynchronous coroutine to terminate a batch job
- Parameters
job – an AWSBatchJob
reason – a reason to terminate the job
config – settings for task pauses between retries
- Returns
an AWSBatchJob after it is terminated
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_job_waiter
(job: aio_aws.aws_batch_models.AWSBatchJob, config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None) → Optional[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Asynchronous coroutine to wait on a batch job. There is no explict timeout on a job waiter, it depends on setting a timeout on the batch job definition. The job waiter exits when the batch job status is either “SUCCEEDED” or “FAILED”. The job data is updated each time the job description is polled (it’s also saved to the jobs-db).
job status identifiers are assumed to be: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]
- Parameters
job – a batch job
config – settings for task pauses between retries
- Returns
a describe_jobs response for job.job_id when it’s complete
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_batch.
aio_batch_monitor_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]¶ Monitor submitted jobs until they complete.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_run_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]¶ Submit jobs that have not been submitted yet, and monitor all jobs until they complete.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_submit_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]¶ Submit jobs that have not been submitted yet.
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_terminate_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Terminate jobs that can be killed (if they are not complete).
- Parameters
jobs – any AWSBatchJob
config – an AWSBatchConfig
- Returns
each job maintains state, so this function returns nothing
-
async
aio_aws.aio_aws_batch.
aio_batch_update_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], config: aio_aws.aio_aws_batch.AWSBatchConfig)[source]¶ Asynchronous coroutine to update status on batch jobs.
job status identifiers in the Batch service are: [“SUBMITTED”, “PENDING”, “RUNNABLE”, “STARTING”, “RUNNING”, “FAILED”, “SUCCEEDED”]
- Parameters
jobs – a list of batch jobs
config – settings for task pauses between retries
- Returns
jobs are updated in-place, with no return
- Raises
botocore.exceptions.ClientError
-
aio_aws.aio_aws_batch.
aio_find_complete_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any complete jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
aio_find_jobs_by_status
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any jobs that match job states.
This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.
- Parameters
jobs – any AWSBatchJob
job_states – a list of valid job states
jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
aio_find_running_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any running jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
batch_cancel_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User cancelled job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Cancel jobs that can be cancelled (if they are not running or complete).
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_get_logs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Get job logs.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_monitor_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Monitor submitted jobs until they complete.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_run_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Submit jobs that have not been submitted yet, and monitor all jobs until they complete.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_submit_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Submit jobs that have not been submitted yet.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_terminate_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], reason: str = 'User terminated job', jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Terminate jobs that can be killed (if they are not complete).
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
batch_update_jobs
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, aio_batch_config: Optional[aio_aws.aio_aws_batch.AWSBatchConfig] = None)[source]¶ Update job descriptions.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to persist job data; this is only applied if an aio_batch_config is not provided
aio_batch_config – a custom AWSBatchConfig; if provided, it is responsible for providing any optional jobs-db
- Returns
each job maintains state, so this function returns nothing
-
aio_aws.aio_aws_batch.
find_complete_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any complete jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
find_incomplete_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None, reset_failed: bool = False)[source]¶ This can find any jobs that are not complete. It can also reset FAILED jobs that could be run again.
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
reset_failed – if enabled, any FAILED jobs will be reset so they can be run again
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
find_jobs_by_status
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], job_states: List[str], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any jobs that match job states.
This is most often used when jobs are regenerated for jobs that could exist in the jobs-db; there are alternative methods on the jobs-db to find all jobs matching various job states.
- Parameters
jobs – any AWSBatchJob
job_states – a list of valid job states
jobs_db – a jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
find_running_jobs
(jobs: Iterable[aio_aws.aws_batch_models.AWSBatchJob], jobs_db: Optional[aio_aws.aio_aws_batch_db.AioAWSBatchDB] = None)[source]¶ Find any running jobs
- Parameters
jobs – any AWSBatchJob
jobs_db – an optional jobs-db to check the latest job status from the jobs-db, if it is not available on the job (highly recommended)
- Returns
yield jobs found
-
aio_aws.aio_aws_batch.
job_for_status
(job, job_states) → Optional[aio_aws.aws_batch_models.AWSBatchJob][source]¶
AioAWS Batch-DB¶
-
class
aio_aws.aio_aws_batch_db.
AioAWSBatchDB
[source]¶ Bases:
abc.ABC
Abstract Base Class for AWS Batch job databases
-
abstract async
all_jobs
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()
wherever possible.
-
abstract async
count_by_job_status
() → collections.Counter[source]¶ Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
-
abstract async
find_by_job_id
(job_id: str) → Optional[Dict][source]¶ Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the job data or None
-
abstract async
find_by_job_name
(job_name: str) → List[Dict][source]¶ Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of dictionaries containing job data
-
abstract async
find_by_job_status
(job_states: List[str]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find any jobs matching any job status values
- Parameters
job_states – a list of valid job states
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
-
abstract async
find_job_logs
(job_id: str) → Optional[Dict][source]¶ Find job logs by job_id
- Parameters
job_id – str
- Returns
job logs document
-
abstract async
find_jobs_to_run
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find all jobs that have not SUCCEEDED.
-
abstract async
find_latest_job_name
(job_name: str) → Optional[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
-
abstract
gen_all_jobs
() → AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Generate all jobs.
-
abstract async
group_by_job_status
() → Dict[str, List[Tuple[str, str, str]]][source]¶ Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
-
abstract async
jobs_recovery
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records to recover job data.
-
abstract async
jobs_to_run
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()
-AWSBatchDB.jobs_db.all()
-
abstract async
remove_by_job_id
(job_id: str) → Optional[Dict][source]¶ Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
-
abstract async
remove_by_job_name
(job_name: str) → Set[str][source]¶ Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
-
abstract async
save_job
(job: aio_aws.aws_batch_models.AWSBatchJob) → Optional[str][source]¶ Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
-
abstract async
save_job_logs
(job: aio_aws.aws_batch_models.AWSBatchJob) → List[int][source]¶ Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a List[tinydb.database.Document.doc_id]
-
abstract async
-
class
aio_aws.aio_aws_batch_db.
AioAWSBatchRedisDB
(redis_url: str = 'redis://127.0.0.1:6379')[source]¶ Bases:
aio_aws.aio_aws_batch_db.AioAWSBatchDB
AWS Batch job databases - aioredis implementation
The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.
-
async
all_jobs
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()
wherever possible.
-
async
count_by_job_status
() → collections.Counter[source]¶ Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
-
property
db_alive
¶
-
property
db_info
¶
-
property
db_semaphore
¶ A semaphore to limit requests to the db
-
async
find_by_job_id
(job_id: str) → Optional[Dict][source]¶ Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the job data or None
-
async
find_by_job_name
(job_name: str) → List[Dict][source]¶ Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of dictionaries containing job data
-
async
find_by_job_status
(job_states: List[str]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find any jobs matching any jobStatus values
- Parameters
job_states – a list of valid job status values
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
-
async
find_job_logs
(job_id: str) → Optional[Dict][source]¶ Find job logs by job_id
- Parameters
job_id – str
- Returns
job logs document
-
async
find_jobs_to_run
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find all jobs that have not SUCCEEDED.
-
async
find_latest_job_name
(job_name: str) → Optional[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
-
gen_all_jobs
() → AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Generate all jobs.
-
async
group_by_job_status
() → Dict[str, List[Tuple[str, str, str]]][source]¶ Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
-
property
jobs_db
¶
-
property
jobs_db_alive
¶
-
async
jobs_recovery
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records to recover job data.
-
async
jobs_to_run
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()
-AWSBatchDB.jobs_db.all()
-
property
logs_db
¶
-
property
logs_db_alive
¶
-
redis_url
: str = 'redis://127.0.0.1:6379'¶
-
async
remove_by_job_id
(job_id: str) → Optional[Dict][source]¶ Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
-
async
remove_by_job_name
(job_name: str) → Set[str][source]¶ Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
-
async
save_job
(job: aio_aws.aws_batch_models.AWSBatchJob) → Optional[str][source]¶ Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
-
async
save_job_logs
(job: aio_aws.aws_batch_models.AWSBatchJob) → List[int][source]¶ Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a List[tinydb.database.Document.doc_id]
-
async
-
class
aio_aws.aio_aws_batch_db.
AioAWSBatchTinyDB
(jobs_db_file: str = '/tmp/aws_batch_jobs.json', logs_db_file: str = '/tmp/aws_batch_logs.json')[source]¶ Bases:
aio_aws.aio_aws_batch_db.AioAWSBatchDB
AWS Batch job databases - TinyDB implementation
The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well.
The batch data is a TinyDB json file, e.g.
>>> import json >>> jobs_db_file = '/tmp/aio_batch_jobs.json' >>> logs_db_file = '/tmp/aio_batch_logs.json' >>> with open(jobs_db_file) as job_file: ... batch_data = json.load(job_file) ... >>> len(batch_data['aws-batch-jobs']) 5 >>> import tinydb >>> tinydb.TinyDB.DEFAULT_TABLE = "aws-batch-jobs" >>> tinydb.TinyDB.DEFAULT_TABLE_KWARGS = {"cache_size": 0} >>> # the jobs and logs are kept in separate DBs so >>> # that performance on the jobs_db is not compromised >>> # by too much data from job logs. >>> jobs_db = tinydb.TinyDB(jobs_db_file) >>> logs_db = tinydb.TinyDB(logs_db_file)
-
async
all_jobs
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Collect all jobs.
Warning: this could exceed memory, try to use the
gen_all_jobs()
wherever possible.
-
async
count_by_job_status
() → collections.Counter[source]¶ Count all jobs by jobStatus
- Returns
a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once)
-
property
db_semaphore
¶ A semaphore to limit requests to the db
-
async
find_by_job_id
(job_id: str) → Optional[tinydb.database.Document][source]¶ Find one job by the jobId
- Parameters
job_id – a batch jobId
- Returns
the
AWSBatchJob.job_data()
or None
-
async
find_by_job_name
(job_name: str) → List[tinydb.database.Document][source]¶ Find any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a list of documents containing
AWSBatchJob.job_data()
-
async
find_by_job_status
(job_states: List[str]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find any jobs matching any jobStatus values
- Parameters
job_states – a list of valid job status values
- Returns
a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values)
-
async
find_job_logs
(job_id: str) → Optional[tinydb.database.Document][source]¶ Find job logs by job_id
- Parameters
job_id – str
- Returns
a tinydb.database.Document with job logs
-
async
find_jobs_to_run
() → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find all jobs that have not SUCCEEDED. Note that any jobs handled by the job-manager will not re-run if they have a job.job_id, those jobs will be monitored until complete.
-
async
find_latest_job_name
(job_name: str) → Optional[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Find the latest job matching the jobName, based on the “createdAt” time stamp.
- Parameters
job_name – a batch jobName
- Returns
the latest job record available
-
gen_all_jobs
() → AsyncIterator[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Generate all jobs.
-
async
group_by_job_status
() → Dict[str, List[Tuple[str, str, str]]][source]¶ Group all jobId by jobStatus
- Returns
a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g.
{ status: [ (job.job_id, job.job_name, job.status), ] }
-
jobs_db_file
: str = '/tmp/aws_batch_jobs.json'¶ a file used for :py:class::TinyDB(jobs_db_file)
-
async
jobs_recovery
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records to recover job data.
-
async
jobs_to_run
(jobs: List[aio_aws.aws_batch_models.AWSBatchJob]) → List[aio_aws.aws_batch_models.AWSBatchJob][source]¶ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned.
Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use job.reset() before passing the jobs to the job-manager.
Note
The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs.
To recover jobs from the jobs_db, use -
AWSBatchDB.find_jobs_to_run()
-AWSBatchDB.jobs_db.all()
-
logs_db_file
: str = '/tmp/aws_batch_logs.json'¶ a file used for :py:class::TinyDB(logs_db_file)
-
async
remove_by_job_id
(job_id: str) → Optional[tinydb.database.Document][source]¶ Remove any job matching the jobId
- Parameters
job_id – a batch jobId
- Returns
a deleted document
-
async
remove_by_job_name
(job_name: str) → Set[str][source]¶ Remove any jobs matching the jobName
- Parameters
job_name – a batch jobName
- Returns
a set of deleted job-ids
-
async
save_job
(job: aio_aws.aws_batch_models.AWSBatchJob) → Optional[str][source]¶ Insert or update a job (if it has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
the jobId if the job is saved
-
async
save_job_logs
(job: aio_aws.aws_batch_models.AWSBatchJob) → Optional[str][source]¶ Insert or update job logs (if the job has a job_id)
- Parameters
job – an AWSBatchJob
- Returns
a job.job_id if the logs are saved
-
async
AioAWS Batch Models¶
Batch Job metadata models.
-
class
aio_aws.aws_batch_models.
AWSBatchJob
(job_name: str, job_queue: str, job_definition: str, command: Optional[List[str]] = None, depends_on: Optional[List[Dict]] = None, container_overrides: Optional[Dict] = None, job_id: Optional[str] = None, status: Optional[str] = None, job_tries: Optional[List[str]] = None, num_tries: int = 0, max_tries: int = 4, job_submission: Optional[Dict] = None, job_description: Optional[Dict] = None, logs: Optional[List[Dict]] = None)[source]¶ Bases:
object
AWS Batch job
Creating an AWSBatchJob instance does not run anything, it’s simply a dataclass to retain and track job attributes. There are no instance methods to run a job, the instances are passed to async coroutine functions.
Replace ‘command’ with ‘container_overrides’ dict for more options; do not use ‘command’ together with ‘container_overrides’; if both are given, any valid ‘command’ value will replace the container_overrides[‘command’].
- Parameters
job_name – A job jobName (truncated to 128 characters).
job_definition – A job job_definition.
job_queue – A batch queue.
command – A container command.
depends_on –
list of dictionaries like:
[ {'jobId': 'abc123', ['type': 'N_TO_N' | 'SEQUENTIAL'] }, ]
type is optional, used only for job arrays
container_overrides –
a dictionary of container overrides. Overrides include ‘instanceType’, ‘environment’, and ‘resourceRequirements’.
If the command parameter is defined, it overrides container_overrides[‘command’]
To override VCPU and MEMORY requirements in the ResourceRequirement of the job definition, a ResourceRequirement must be specified in the SubmitJob request.
https://docs.aws.amazon.com/batch/latest/APIReference/API_ResourceRequirement.html
VCPU (int): This parameter maps to CpuShares in the Create a container section of the Docker Remote API and the –cpu-shares option to docker run. Each vCPU is equivalent to 1,024 CPU shares. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.
MEMORY (int): This parameter indicates the amount of memory (in MiB) that is reserved for the job. This parameter is supported for jobs that run on EC2 resources, but isn’t supported for jobs that run on Fargate resources.
container_overrides = { "resourceRequirements": [ { "type": "VCPU", "value": "2" }, { "type": "MEMORY", "value": str(int(gib_to_mib(6))) } ] }
max_tries – an optional limit to the number of job retries, which can apply in the job-manager function to any job with a SPOT failure only and it applies regardless of the job-definition settings.
-
STATES
= ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED']¶
-
allow_submit_job
() → bool[source]¶ Logic to determine whether a job can be submitted or resubmitted:
jobs with an existing job.job_id should skip submission to avoid resubmission for the same job
jobs with too many tries cannot be resubmitted
The
AWSBatchJob.reset()
can be applied in order to resubmit any job that is not allowed.- Returns
True if it is allowed
-
command
: List[str] = None¶
-
container_overrides
: Dict = None¶
-
property
created
¶ Timestamp (milliseconds since the epoch) for job createdAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was created. For non-array jobs and parent array jobs, this is when the job entered the SUBMITTED state (at the time SubmitJob was called). For array child jobs, this is when the child job was spawned by its parent and entered the PENDING state.
-
property
created_datetime
¶ Datetime for job createdAt; this depends on updates to the jobDescription.
-
property
db_data
¶ AWS Batch job data for state machine persistence
-
property
db_logs_data
¶ AWS Batch job with logs persistence
-
depends_on
: List[Dict] = None¶
-
property
elapsed
¶ Timestamp (milliseconds since the epoch) for job elapsed time; this depends on updates to the jobDescription; it is the difference between createdAt and stoppedAt (this can include long periods in a job queue).
-
job_definition
: str¶
-
job_description
: Optional[Dict] = None¶
-
job_id
: Optional[str] = None¶
-
job_name
: str¶
-
job_queue
: str¶
-
job_submission
: Optional[Dict] = None¶
-
job_tries
: List[str] = None¶
-
logs
: Optional[List[Dict]] = None¶
-
max_tries
: int = 4¶
-
num_tries
: int = 0¶
-
property
params
¶ AWS Batch parameters for job submission
-
property
runtime
¶ Timestamp (milliseconds since the epoch) for job run time; this depends on updates to the jobDescription; it is the difference between startedAt and stoppedAt.
-
property
spinup
¶ Timestamp (milliseconds since the epoch) for job startup time; this depends on updates to the jobDescription; it is the difference between createdAt and startedAt.
-
property
started
¶ Timestamp (milliseconds since the epoch) for job startedAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was started (when the job transitioned from the STARTING state to the RUNNING state). This parameter isn’t provided for child jobs of array jobs or multi-node parallel jobs.
-
property
started_datetime
¶ Datetime for job startedAt; this depends on updates to the jobDescription.
-
status
: Optional[str] = None¶
-
property
stopped
¶ Timestamp (milliseconds since the epoch) for job stoppedAt; this depends on updates to the jobDescription.
The Unix timestamp (in milliseconds) for when the job was stopped (when the job transitioned from the RUNNING state to a terminal state, such as SUCCEEDED or FAILED).
-
property
stopped_datetime
¶ Datetime for job stoppedAt; this depends on updates to the jobDescription.
-
property
submitted
¶ Timestamp (milliseconds since the epoch) for job submission
-
property
submitted_datetime
¶ Datetime for job submission; this depends on setting the job_submission.
-
class
aio_aws.aws_batch_models.
AWSBatchJobDescription
(jobName: str = None, jobId: str = None, jobQueue: str = None, status: str = None, attempts: List[Dict] = None, statusReason: str = None, createdAt: int = None, startedAt: int = None, stoppedAt: int = None, dependsOn: List[str] = None, jobDefinition: str = None, parameters: Dict = None, container: Dict = None, timeout: Dict = None)[source]¶ Bases:
object
-
attempts
: List[Dict] = None¶
-
container
: Dict = None¶
-
createdAt
: int = None¶
-
dependsOn
: List[str] = None¶
-
jobDefinition
: str = None¶
-
jobId
: str = None¶
-
jobName
: str = None¶
-
jobQueue
: str = None¶
-
parameters
: Dict = None¶
-
startedAt
: int = None¶
-
status
: str = None¶
-
statusReason
: str = None¶
-
stoppedAt
: int = None¶
-
timeout
: Dict = None¶
-
-
class
aio_aws.aws_batch_models.
AWSBatchJobStates
(value)[source]¶ Bases:
enum.Enum
An enumeration.
-
FAILED
= 7¶
-
PENDING
= 2¶
-
RUNNABLE
= 3¶
-
RUNNING
= 5¶
-
STARTING
= 4¶
-
SUBMITTED
= 1¶
-
SUCCEEDED
= 6¶
-
-
aio_aws.aws_batch_models.
gb_to_gib
(gb: Union[int, float]) → float[source]¶ Gigabyte (Gb) to Gibibyte (GiB) :param gb: Gigabytes (Gb) :return: Gibibyte (GiB)
AioAWS Lambda¶
Manually create a ‘lambda_dev’ function with simple code like:
# lambda_dev/lambda_function.py
import json
def lambda_handler(event, context):
csv = ", ".join([str(i) for i in range(10)])
print(csv) # to log something
return {
'statusCode': 200,
'body': json.dumps(csv)
}
The aio_aws_lambda.py module has a main script to call this simple lambda function. This lambda function easily runs within a 128 Mb memory allocation and it runs in less time than the minimal billing period, so it’s as cheap as it can be and could fit within a monthly free quota.
$ ./aio_aws/aio_aws_lambda.py
Test async lambda functions
[INFO] 2020-04-07T03:10:21.741Z aio-aws:aio_lambda_invoke:114 AWS Lambda (lambda_dev) invoked OK
1 lambdas finished in 0.31 seconds.
$ N_LAMBDAS=1000 ./aio_aws/aio_aws_lambda.py
# snipped logging messages
1000 lambdas finished in 14.95 seconds.
-
class
aio_aws.aio_aws_lambda.
AWSLambdaFunction
(name: str, type: str = 'RequestResponse', log_type: str = 'None', context: Optional[str] = None, payload: bytes = b'', qualifier: Optional[str] = None, response: Optional[Dict] = None, data: Optional[bytes] = None)[source]¶ Bases:
object
AWS Lambda Function
Creating an AWSLambdaFunction instance does not create or invoke anything, it’s a dataclass to retain and track function parameters and response data.
- Parameters
name – A lambda FunctionName (truncated to 64 characters).
type – the type of function invocation (“RequestResponse”, “Event”, “DryRun”)
log_type – the type of function logging (“None” or “Tail”)
context – the client context; for an example of a ClientContext JSON, see PutEvents in the Amazon Mobile Analytics API Reference and User Guide. The ClientContext JSON must be base64-encoded and has a maximum size of 3583 bytes.
payload – an input payload (bytes or seekable file-like object); JSON that you want to provide to your Lambda function as input.
qualifier – an optional function version or alias name; If you specify a function version, the API uses the qualified function ARN to invoke a specific Lambda function. If you specify an alias name, the API uses the alias ARN to invoke the Lambda function version to which the alias points. If you don’t provide this parameter, then the API uses unqualified function ARN which results in invocation of the $LATEST version.
-
LOG_TYPES
= ['None', 'Tail']¶
-
TYPES
= ['RequestResponse', 'Event', 'DryRun']¶
-
property
content_length
¶
-
property
content_type
¶
-
context
: str = None¶
-
data
: bytes = None¶
-
property
error
¶ This could be a lambda function error or a client error
-
async
invoke
(config: aio_aws.aio_aws_config.AioAWSConfig, lambda_client: aiobotocore.client.AioBaseClient) → aio_aws.aio_aws_lambda.AWSLambdaFunction[source]¶ Asynchronous coroutine to invoke a lambda function; this updates the
response
and calls the py:meth:.read_response method to handle the response.- Parameters
config – aio session and client settings
lambda_client – aio client for lambda
- Returns
a lambda response
- Raises
botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError
-
property
json
¶
-
log_type
: str = 'None'¶
-
property
logs
¶
-
name
: str¶
-
property
params
¶ AWS Lambda parameters to invoke function
-
payload
: bytes = b''¶
-
qualifier
: str = None¶
-
async
read_response
()[source]¶ Asynchronous coroutine to read a lambda response; this updates the
data
attribute.- Raises
botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError
-
response
: Dict = None¶
-
property
response_headers
¶
-
property
response_metadata
¶
-
property
status_code
¶
-
property
text
¶
-
type
: str = 'RequestResponse'¶
-
async
aio_aws.aio_aws_lambda.
run_lambda_function_thread_pool
(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction], n_tasks: int = 4)[source]¶
-
async
aio_aws.aio_aws_lambda.
run_lambda_functions
(lambda_functions: List[aio_aws.aio_aws_lambda.AWSLambdaFunction])[source]¶ Use some default config settings to run lambda functions
lambda_funcs = [] for i in range(5): event = {"i": i} payload = json.dumps(event).encode() func = AWSLambdaFunction(name="lambda_dev", payload=payload) lambda_funcs.append(func) asyncio.run(run_lambda_functions(lambda_funcs)) for func in lambda_funcs: assert response_success(func.response)
- Returns
it returns nothing, the lambda function will contain the results of any lambda response in func.response and func.data
AioAWS S3¶
Example code for async AWS S3 services. The aiobotocore library wraps a
release of botocore to patch it with features for async coroutines using
asyncio and aiohttp. The simple example code iterates on a list of
buckets or objects to issue a HEAD request, as a simple way to determine
whether access is permitted. The example compares an async approach (using
aiobotocore) vs a sequential blocking approach (using botocore). The async
approach uses a client connection limiter, based on asyncio.Semaphore(20)
.
$ ./aio_aws/aio_aws_s3.py
aio-aws check an s3 object that exists
GET HEAD s3://noaa-goes16/index.html -> {'ResponseMetadata': {'RequestId': 'A2F4DF282C84341B', 'HostId': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'Rt45GH1taNRVUAPTg8XJfs4KLvri0OKos/6Wohp5tAfQa+moUT/9mC/0Wa9cYVQZcMAWKtIYfkE=', 'x-amz-request-id': 'A2F4DF282C84341B', 'date': 'Tue, 03 Mar 2020 19:22:21 GMT', 'last-modified': 'Wed, 12 Feb 2020 19:14:27 GMT', 'etag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'content-encoding': 'text/html', 'accept-ranges': 'bytes', 'content-type': 'text/html', 'content-length': '32356', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'AcceptRanges': 'bytes', 'LastModified': datetime.datetime(2020, 2, 12, 19, 14, 27, tzinfo=tzutc()), 'ContentLength': 32356, 'ETag': '"8f9810548f75fd8b8a16431c74ad54ac"', 'ContentEncoding': 'text/html', 'ContentType': 'text/html', 'Metadata': {}}
aio-aws check an s3 object that is missing; it raises.
GET HEAD s3://noaa-goes16/missing.html -> 404: object missing
aio-aws find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 2.42 seconds.
aio-aws collection of all objects in a bucket-prefix.
found 9015 s3 objects
finished in 4.43 seconds.
aws-sequential find all buckets that allow access.
queried 72 buckets
found 69 accessible buckets
finished in 31.11 seconds.
aws-sequential collection for all objects in a bucket-prefix
found 9015 s3 objects
finished in 35.72 seconds.
Checking equality of collections for aio-aws vs. aws-sync
The collections for aio-aws vs. aws-sync match OK
See also
-
class
aio_aws.aio_aws_s3.
S3Parts
(bucket: str, key: str, PROTOCOL: str = 's3://', DELIMITER: str = '/')[source]¶ Bases:
object
-
DELIMITER
: str = '/'¶
-
PROTOCOL
: str = 's3://'¶
-
bucket
: str¶
-
key
: str¶
-
static
parse_s3_uri
(uri: str) → aio_aws.aio_aws_s3.S3Parts[source]¶
-
property
s3_uri
¶
-
-
async
aio_aws.aio_aws_s3.
aio_s3_bucket_access
(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Tuple[str, bool][source]¶ Asynchronous coroutine to issue a HEAD request to check if s3 bucket access is allowed.
- Parameters
bucket_name – an s3 bucket name
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
-
async
aio_aws.aio_aws_s3.
aio_s3_bucket_head
(bucket_name: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Dict[source]¶ Asynchronous coroutine to issue a HEAD request for s3 bucket.
- Parameters
bucket_name – an s3 bucket name
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a HEAD request
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_s3.
aio_s3_buckets_access
(buckets: List[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Dict[str, bool][source]¶ Asynchronous coroutine to issue HEAD requests on all available s3 buckets to check if each bucket allows access.
- Parameters
buckets – a list of bucket names to check; the default is to issue a request for all buckets in the session region
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
dict of
{bucket_name: str, access: bool}
-
async
aio_aws.aio_aws_s3.
aio_s3_buckets_list
(config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Dict[source]¶ Asynchronous coroutine to list all buckets.
- Parameters
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a ListBuckets request
-
async
aio_aws.aio_aws_s3.
aio_s3_object_access
(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Tuple[str, bool][source]¶ Asynchronous coroutine to issue a HEAD request to check if s3 object access is allowed.
- Parameters
s3_uri – an s3 URI
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a tuple of
(s3_uri: str, access: bool)
-
async
aio_aws.aio_aws_s3.
aio_s3_object_head
(s3_uri: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Dict[source]¶ Asynchronous coroutine to issue a HEAD request for s3 object.
- Parameters
s3_uri – an s3 URI
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a response to a HEAD request
- Raises
botocore.exceptions.ClientError
-
async
aio_aws.aio_aws_s3.
aio_s3_objects_access
(s3_uris: List[str], config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → Dict[str, bool][source]¶ Asynchronous coroutine to issue HEAD requests on all available s3 objects to check if each s3_object allows access.
- Parameters
s3_uris – a list of s3 uris in the form ‘s3://bucket-name/key’
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
dict of
{s3_uri: str, access: bool}
-
async
aio_aws.aio_aws_s3.
aio_s3_objects_list
(bucket_name: str, bucket_prefix: str, config: aio_aws.aio_aws_config.AioAWSConfig, s3_client: aiobotocore.client.AioBaseClient) → List[Dict][source]¶ Asynchronous coroutine to collect all objects in a bucket prefix.
- Parameters
bucket_name – param passed to s3_client.list_objects_v2 ‘Bucket’
bucket_prefix – param passed to s3_client.list_objects_v2 ‘Prefix’
config – an AioAWSConfig
s3_client – an AioBaseClient for s3
- Returns
a list of s3 object data, e.g.
>>> aio_s3_objects[0] {'ETag': '"192e29f360ea8297b5876b33b8419741"', 'Key': 'ABI-L2-ADPC/2019/337/13/OR_ABI-L2-ADPC-M6_G16_s20193371331167_e20193371333539_c20193371334564.nc', 'LastModified': datetime.datetime(2019, 12, 3, 14, 27, 5, tzinfo=tzutc()), 'Size': 892913, 'StorageClass': 'INTELLIGENT_TIERING'}
-
aio_aws.aio_aws_s3.
aws_s3_bucket_access
(bucket_name: str, s3_client: botocore.client.BaseClient) → bool[source]¶ Check access to an S3 bucket by issuing a HEAD request
- Parameters
bucket_name – An s3 bucket name
s3_client – botocore.client.BaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
-
aio_aws.aio_aws_s3.
aws_s3_buckets_access
(s3_client: botocore.client.BaseClient) → Dict[str, bool][source]¶ A concurrent.futures approach to list all s3 buckets and issue a HEAD request to check if access is allowed. This is used in performance comparisons between botocore and aiobotocore functionality.
- Parameters
s3_client – botocore.client.BaseClient for s3
- Returns
a tuple of
(bucket_name: str, access: bool)
S3 AIO¶
These functions provide non-blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.
-
async
aio_aws.s3_aio.
geojson_s3_dump
(geojson_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Optional[str][source]¶ Write GeoJSON to an s3 URI
- Parameters
geojson_data – an object to json.dump
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
-
async
aio_aws.s3_aio.
geojson_s3_load
(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Dict[source]¶ Read GeoJSON data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
geojson data
-
async
aio_aws.s3_aio.
geojsons_dump
(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str]) → Optional[Union[pathlib.Path, str]][source]¶ Write a GeoJSON Text Sequence file
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
geojsons_file – a file path to write
- Returns
if the dump succeeds, return the geojsons_file, or None
-
async
aio_aws.s3_aio.
geojsons_s3_dump
(geojson_features: List[Dict], s3uri: str, s3_client: aiobotocore.client.AioBaseClient) → Optional[str][source]¶ Write GeoJSON Text Sequence files to an s3 URI
[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:
import geopandas as gpd s3_uri = "s3://your-bucket/prefix/input.geojsons" gdf = gpd.read_file(s3_uri)
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
s3uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
-
async
aio_aws.s3_aio.
geojsons_s3_load
(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → List[Dict][source]¶ Read GeoJSON Text Sequence data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
geojson features
See also
-
async
aio_aws.s3_aio.
get_s3_content
(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient)[source]¶ Read an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the data from the s3 object
-
async
aio_aws.s3_aio.
json_dump
(data: Any, file: Union[pathlib.Path, str]) → Optional[Union[pathlib.Path, str]][source]¶ Write JSON to a file
- Parameters
data – any data compatible with json.dumps
file – a file path to write
- Returns
if the dump succeeds, return the file, or None
-
async
aio_aws.s3_aio.
json_s3_dump
(json_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Optional[str][source]¶ Write JSON to an s3 URI
- Parameters
json_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
-
async
aio_aws.s3_aio.
json_s3_load
(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Any[source]¶ Read JSON data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
data from the json load
-
async
aio_aws.s3_aio.
put_s3_content
(data_file: str, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Optional[str][source]¶ Write a file to an s3 object
- Parameters
data_file – a data file
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
-
aio_aws.s3_aio.
run_s3_load_files
(s3_uris: List[str], *args, **kwargs) → Dict[str, Optional[Any]][source]¶ Collect data from S3 files in JSON or YAML formats
- Parameters
s3_uris – a list of S3 URIs
- Returns
a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully
-
aio_aws.s3_aio.
s3_aio_client
(*args, **kwargs) → aiobotocore.client.AioBaseClient[source]¶ This creates a aiobotocore.client.AioBaseClient that uses aiohttp for asynchronous s3 requests
- Returns
a aiobotocore.client.AioBaseClient for s3
-
async
aio_aws.s3_aio.
s3_file_info
(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: aiobotocore.client.AioBaseClient) → aio_aws.s3_uri.S3Info[source]¶ Collect data from an S3 HEAD request for an S3URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
-
async
aio_aws.s3_aio.
s3_files_info
(s3_uris: List[Union[aio_aws.s3_uri.S3URI, str]], s3_client: aiobotocore.client.AioBaseClient) → List[aio_aws.s3_uri.S3Info][source]¶ Collect data from S3 HEAD requests for many S3URI
- Parameters
s3_uris – a list of S3URI
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
a list of S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
-
async
aio_aws.s3_aio.
s3_load_file
(s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Tuple[str, Optional[Any]][source]¶ Load various file types from s3; it supports files with a known file suffix, such as “.json”, “.geojson”, “.geojsons”, “.yaml”, “.yml”
- Parameters
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
a Tuple[s3_uri, Optional[s3_data]] where the s3_data could be None if the file type is not recognized or a file read fails
-
async
aio_aws.s3_aio.
s3_load_files
(s3_uris: List[str], *args, s3_client: Optional[aiobotocore.client.AioBaseClient] = None, **kwargs) → Dict[str, Optional[Any]][source]¶ Collect data from S3 files in JSON or YAML formats
- Parameters
s3_uris – a list of S3 URIs
s3_client – an optional aiobotocore.client.AioBaseClient for s3
- Returns
a Dict[s3_uri: s3_data] for all the s3 URIs that can be read successfully
-
async
aio_aws.s3_aio.
yaml_dump
(data: Any, file: Union[pathlib.Path, str]) → Optional[Union[pathlib.Path, str]][source]¶ Write YAML to a file
- Parameters
data – any data compatible with yaml.safe_dump
file – a file path to write
- Returns
if the dump succeeds, return the file, or None
-
async
aio_aws.s3_aio.
yaml_s3_dump
(yaml_data: Any, s3_uri: str, s3_client: aiobotocore.client.AioBaseClient) → Optional[str][source]¶ Write YAML to an s3 URI
- Parameters
yaml_data – an object to yaml.dump
s3_uri – a fully qualified S3 URI for an s3 object
s3_client – a required aiobotocore.client.AioBaseClient for s3
- Returns
the s3 URI on success
S3 IO¶
These functions provide blocking serialization to files and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.
-
aio_aws.s3_io.
geojson_s3_dump
(geojson_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Optional[str][source]¶ Write GeoJSON to an s3 URI
- Parameters
geojson_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
-
aio_aws.s3_io.
geojson_s3_load
(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Dict[source]¶ Read GeoJSON data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
geojson data
-
aio_aws.s3_io.
geojsons_dump
(geojson_features: List[Dict], geojsons_file: Union[pathlib.Path, str]) → Optional[Union[pathlib.Path, str]][source]¶ - Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
geojsons_file – a file path to write
- Returns
if the dump succeeds, return the geojsons_file, or None
-
aio_aws.s3_io.
geojsons_s3_dump
(geojson_features: List[Dict], s3uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Optional[str][source]¶ Write GeoJSON Text Sequence files to an s3 URI
[GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona driver=”GeoJSONSeq”, which can be auto-detected. For example:
import geopandas as gpd s3_uri = "s3://your-bucket/prefix/input.geojsons" gdf = gpd.read_file(s3_uri)
- Parameters
geojson_features – a list of geojson features; from any feature collection, this is geojson_collection[“features”]
s3uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
-
aio_aws.s3_io.
geojsons_s3_load
(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → List[Dict][source]¶ Read GeoJSON Text Sequence data from an s3 object
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
geojson features
See also
-
aio_aws.s3_io.
get_s3_content
(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None)[source]¶ Read an s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the data from the s3 object
-
aio_aws.s3_io.
json_s3_dump
(json_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Optional[str][source]¶ Write JSON to an s3 URI
- Parameters
json_data – an object to json.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
-
aio_aws.s3_io.
json_s3_load
(s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Any[source]¶ Load JSON from an s3 URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
data from the json load
-
aio_aws.s3_io.
put_s3_content
(data_file: str, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Optional[str][source]¶ Write a file to an s3 URI
- Parameters
data_file – a data file
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
-
aio_aws.s3_io.
s3_file_info
(s3_uri: Union[aio_aws.s3_uri.S3URI, str], s3_client: Optional[botocore.client.BaseClient] = None) → aio_aws.s3_uri.S3Info[source]¶ Collect data from an S3 HEAD request for an S3URI
- Parameters
s3_uri – a fully qualified S3 URI for the s3 object to read
s3_client – an optional botocore.client.BaseClient for s3
- Returns
an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data
-
aio_aws.s3_io.
s3_files_info
(s3_uris: List[Union[aio_aws.s3_uri.S3URI, str]], s3_client: Optional[botocore.client.BaseClient] = None) → List[aio_aws.s3_uri.S3Info][source]¶
-
aio_aws.s3_io.
s3_io_client
() → botocore.client.BaseClient[source]¶ This creates a botocore.client.BaseClient that uses urllib3, which should be thread-safe, with a default connection pool.
This uses a botocore.config.Config with 3 retries using a standard back off method
- Returns
a botocore.client.BaseClient for s3
-
aio_aws.s3_io.
yaml_s3_dump
(yaml_data: Any, s3_uri: str, s3_client: Optional[botocore.client.BaseClient] = None) → Optional[str][source]¶ Write YAML to an s3 URI
- Parameters
yaml_data – an object to yaml.dump
s3_uri – a fully qualified S3 URI for the s3 object to write
s3_client – an optional botocore.client.BaseClient for s3
- Returns
the s3 URI on success
S3URI¶
-
class
aio_aws.s3_uri.
S3Info
(s3_uri: aio_aws.s3_uri.S3URI, s3_size: int = None, last_modified: datetime.datetime = None)[source]¶ Bases:
object
-
property
dict
¶
-
property
iso8601
¶
-
property
json
¶
-
last_modified
: datetime.datetime = None¶
-
s3_size
: int = None¶
-
s3_uri
: aio_aws.s3_uri.S3URI¶
-
property
-
class
aio_aws.s3_uri.
S3Object
(bucket: str, key: str)[source]¶ Bases:
object
Just the bucket_name and key for an
s3.ObjectSummary
.This simple dataclass should work around problems with
Pickle
for ans3.ObjectSummary
.from aio_aws.s3_uri import S3Object # assume obj is an s3.ObjectSummary S3Object(bucket=obj.bucket_name, key=obj.key)
-
bucket
: str¶
-
key
: str¶
-
property
s3_uri
¶ s3_uri: str for
s3://{bucket}/{key}
-
-
class
aio_aws.s3_uri.
S3Paths
(s3_uri: str = '', bucket: str = '', key: str = '')[source]¶ Bases:
object
S3 URI components
-
property
bucket
¶
-
glob_file_pattern
()[source]¶ Construct a glob pattern relative to the key_path. The final glob pattern is:
{key_path}/**/{file_stem}*.*
- Returns
str
-
glob_pattern
(glob_pattern: str = '**/*')[source]¶ Construct a glob pattern relative to the key_path. The final glob pattern is:
str(PurePosixPath(self.key_path) / glob_pattern)
- Parameters
glob_pattern – str defaults to
'**/*'
- Returns
str
-
property
key
¶
-
property
key_file
¶
-
property
key_path
¶
-
static
parse_s3_uri
(s3_uri: str) → aio_aws.s3_uri.S3Paths[source]¶ Parse an S3 URI into components; the delimiter must be ‘/’
- Parameters
s3_uri –
- Returns
S3Paths(bucket, key, key_path, key_file)
-
property
protocol
¶
-
property
s3_uri
¶ s3_uri: str for
s3://{bucket}/{key}
-
property
-
class
aio_aws.s3_uri.
S3URI
(s3_uri: str = '', bucket: str = '', key: str = '')[source]¶ Bases:
aio_aws.s3_uri.S3Paths
S3 URI representation and parsing.
At present, this class is designed to represent one s3 object. Although it has some utilities for working with general bucket and key paths, it is designed to work with one s3 object.
For technical details on S3 Objects, refer to
https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
The following are the rules for naming S3 buckets in all AWS Regions:
Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.
Bucket names must be unique across all existing bucket names in Amazon S3.
Bucket names must comply with DNS naming conventions.
Bucket names must be at least 3 and no more than 63 characters long.
Bucket names must not contain uppercase characters or underscores.
Bucket names must start with a lowercase letter or number.
Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.
Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).
When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.
The following are the rules for naming S3 keys:
The name for a key is a sequence of Unicode characters whose UTF-8 encoding is at most 1024 bytes long.
- Parameters
bucket – str for bucket in
s3://{bucket}/{key}
key – str for key in
s3://{bucket}/{key}
-
static
parse_s3_uri
(s3_uri: str) → aio_aws.s3_uri.S3URI[source]¶ Parse an S3 URI into components; the delimiter must be ‘/’
- Parameters
s3_uri –
- Returns
S3Paths(bucket, key, key_path, key_file)
-
s3_derivatives
() → Iterable[source]¶ Use
s3_objects(glob_file_pattern)
to find all the derivatives with matching file names (key_file*.*) below the path of this file.- Returns
Iterable[s3.ObjectSummary]
-
s3_exists
() → bool[source]¶ Check the s3 file summary to confirm a file exists. :return: bool True if the s3 file exists
-
s3_head_request
() → dict[source]¶ Issue an HTTP HEAD request to get the s3 URI summary data.
- Returns
dict of HEAD response data or an empty dict if it fails.
-
s3_object_summary
() → s3.ObjectSummary[source]¶ Create an s3.ObjectSummary for this S3URI; this does not check if the object exists.
- Returns
s3.ObjectSummary
-
s3_objects
(glob_pattern: Optional[str] = None, prefix: Optional[str] = None) → Iterable[source]¶ Search all the s3 objects at or below the s3_uri.bucket, using an optional filter prefix and/or a glob pattern (defaults to everything).
Using the s3_uri.key_path as a filter prefix is most often critical to limit the number of bucket objects to process. The addition of a glob pattern is applied to the results of all objects found; the glob pattern match is applied to each s3.ObjectSummary.key.
- Parameters
prefix – str to specify a Prefix filter based on glob patterns
glob_pattern – str to specify a filter based on glob patterns
- Returns
Iterable[s3.ObjectSummary]
-
aio_aws.s3_uri.
bucket_validate
(bucket_name: str) → bool[source]¶ For technical details on S3 buckets, refer to
The following are the rules for naming S3 buckets in all AWS Regions:
Bucket names must follow DNS-compliant naming conventions. Amazon S3 no longer supports creating bucket names that contain uppercase letters or underscores. This ensures that each bucket can be addressed using virtual host style addressing, such as https://myawsbucket.s3.amazonaws.com.
Bucket names must be unique across all existing bucket names in Amazon S3.
Bucket names must comply with DNS naming conventions.
Bucket names must be at least 3 and no more than 63 characters long.
Bucket names must not contain uppercase characters or underscores.
Bucket names must start with a lowercase letter or number.
Bucket names must be a series of one or more labels. Adjacent labels are separated by a single period (.). Bucket names can contain lowercase letters, numbers, and hyphens. Each label must start and end with a lowercase letter or a number.
Bucket names must not be formatted as an IP address (e.g., 192.168.5.4).
When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL wildcard certificate only matches buckets that don’t contain periods. To work around this, use HTTP or write your own certificate verification logic. We recommend that you do not use periods (“.”) in bucket names when using virtual hosted-style buckets.
- Parameters
bucket_name –
- Returns
True if the bucket_name is valid
- Raises
ValueError if the bucket_name is not valid
-
aio_aws.utils.
datetime_to_http_date
(dt: datetime.datetime) → str[source]¶ Parse a datetime into an HTTP date string (using GMT).
- Parameters
dt – a datetime
- Returns
an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
-
aio_aws.utils.
datetime_to_unix_milliseconds
(dt: datetime.datetime) → int[source]¶ datetime to unix timestamp - time since the epoch in milliseconds
-
aio_aws.utils.
handle_head_error_code
(error: botocore.exceptions.ClientError, item: Optional[str] = None) → Optional[bool][source]¶
-
aio_aws.utils.
http_date_to_datetime
(http_date: str) → datetime.datetime[source]¶ Parse a HTTP date string into a datetime.
- Parameters
http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- Returns
a datetime
-
aio_aws.utils.
http_date_to_timestamp
(http_date: str) → float[source]¶ Parse a HTTP date string into a timestamp as seconds since the epoch.
- Parameters
http_date – e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
- Returns
a timestamp (seconds since the epoch)
-
aio_aws.utils.
timestamp_to_http_date
(ts: float) → str[source]¶ Parse a timestamp (seconds since the epoch) into an HTTP date string (using GMT).
- Parameters
ts – a timestamp (seconds since the epoch)
- Returns
an HTTP date string, e.g. “Mon, 23 Mar 2020 15:29:33 GMT”
-
aio_aws.utils.
utc_unix_milliseconds
() → int[source]¶ Unix timestamp - time since the epoch in milliseconds
-
aio_aws.uuid_utils.
generate_hex_uuids
(n_uuids: int) → Iterator[str][source]¶ Generate an iterator of UUIDs
-
aio_aws.uuid_utils.
generate_uuids
(n_uuids: int) → Iterator[str][source]¶ Generate an iterator of UUIDs