Source code for aio_aws.aio_aws_batch_db

# Copyright 2019-2021 Darren Weber
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
AioAWS Batch-DB
===============

"""

import abc
import asyncio
import json
from collections import Counter
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncIterator
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple

import tinydb

from aio_aws.aws_batch_models import AWSBatchJob
from aio_aws.aws_batch_models import AWSBatchJobStates
from aio_aws.logger import get_logger
from aio_aws.uuid_utils import valid_uuid4

LOGGER = get_logger(__name__)


[docs]@dataclass class AioAWSBatchDB(abc.ABC): """ Abstract Base Class for AWS Batch job databases """
[docs] @abc.abstractmethod async def all_jobs(self) -> List[AWSBatchJob]: """ Collect all jobs. Warning: this could exceed memory, try to use the :py:meth:`gen_all_jobs` wherever possible. """ pass
[docs] @abc.abstractmethod async def gen_all_jobs(self) -> AsyncIterator[AWSBatchJob]: """ Generate all jobs. """ yield
[docs] @abc.abstractmethod async def all_job_ids(self) -> Set[str]: """ Collect all jobIds. """ pass
[docs] @abc.abstractmethod async def gen_job_ids(self) -> AsyncIterator[str]: """ Generate all jobIds. """ yield
[docs] @abc.abstractmethod async def count_by_job_status(self) -> Counter: """ Count all jobs by jobStatus :return: a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once) """ pass
[docs] @abc.abstractmethod async def group_by_job_status(self) -> Dict[str, List[Tuple[str, str, str]]]: """ Group all jobId by jobStatus :return: a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g. .. code-block:: { status: [ (job.job_id, job.job_name, job.status), ] } """ pass
[docs] @abc.abstractmethod async def find_by_job_id(self, job_id: str) -> Optional[Dict]: """ Find one job by the jobId :param job_id: a batch jobId :return: the job data or None """ pass
[docs] @abc.abstractmethod async def find_by_job_name(self, job_name: str) -> List[Dict]: """ Find any jobs matching the jobName :param job_name: a batch jobName :return: a list of dictionaries containing job data """ pass
[docs] @abc.abstractmethod async def find_latest_job_name(self, job_name: str) -> Optional[AWSBatchJob]: """ Find the latest job matching the jobName, based on the "createdAt" time stamp. :param job_name: a batch jobName :return: the latest job record available """ pass
[docs] @abc.abstractmethod async def find_by_job_status(self, job_states: List[str]) -> List[AWSBatchJob]: """ Find any jobs matching any job status values :param job_states: a list of valid job states :return: a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values) """ pass
[docs] @abc.abstractmethod async def remove_by_job_id(self, job_id: str) -> Optional[Dict]: """ Remove any job matching the jobId :param job_id: a batch jobId :return: a deleted document """ pass
[docs] @abc.abstractmethod async def remove_by_job_name(self, job_name: str) -> Set[str]: """ Remove any jobs matching the jobName :param job_name: a batch jobName :return: a set of deleted job-ids """ pass
[docs] @abc.abstractmethod async def save_job(self, job: AWSBatchJob) -> Optional[str]: """ Insert or update a job (if it has a job_id) :param job: an AWSBatchJob :return: the jobId if the job is saved """ pass
[docs] @abc.abstractmethod async def find_job_logs(self, job_id: str) -> Optional[Dict]: """ Find job logs by job_id :param job_id: str :return: job logs document """ pass
[docs] @abc.abstractmethod async def save_job_logs(self, job: AWSBatchJob) -> List[int]: """ Insert or update job logs (if the job has a job_id) :param job: an AWSBatchJob :return: a List[tinydb.database.Document.doc_id] """ pass
[docs] @abc.abstractmethod async def find_jobs_to_run(self) -> List[AWSBatchJob]: """ Find all jobs that have not SUCCEEDED. """ pass
[docs] @abc.abstractmethod async def jobs_to_run(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned. Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use `job.reset()` before passing the jobs to the job-manager. .. note:: The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs. To recover jobs from the jobs_db, use - :py:meth:`AWSBatchDB.find_jobs_to_run()` - :py:meth:`AWSBatchDB.jobs_db.all()` """ pass
[docs] @abc.abstractmethod async def jobs_recovery(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records to recover job data. """ pass
[docs]@dataclass class AioAWSBatchRedisDB(AioAWSBatchDB): """ AWS Batch job databases - aioredis implementation The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well. """ redis_url: str = "redis://127.0.0.1:6379" # TODO: use files to dump redis-db? # #: a file used for dumping jobs-db # jobs_db_file: str = "/tmp/aws_batch_jobs.json" # #: a file used for dumping logs-db # logs_db_file: str = "/tmp/aws_batch_logs.json" def __post_init__(self): # the jobs and logs are kept in separate DBs so # that performance on the jobs_db is not compromised # by too much data from job logs. # Lazy init for any asyncio instances self._db_sem = None @property async def jobs_db(self): # lazy load this optional dependency import aioredis # need to instantiate redis-connection on-demand to avoid # Future <Future pending> attached to a different loop # TODO: lean how to optimize connections to auto-close them async with self.db_semaphore: return aioredis.from_url( self.redis_url, db=2, encoding="utf-8", decode_responses=True, max_connections=1, ) @property async def logs_db(self): # lazy load this optional dependency import aioredis # need to instantiate redis-connection on-demand to avoid # Future <Future pending> attached to a different loop async with self.db_semaphore: return aioredis.from_url( self.redis_url, db=4, encoding="utf-8", decode_responses=True, max_connections=1, ) @property def db_semaphore(self) -> asyncio.Semaphore: """A semaphore to limit requests to the db""" if self._db_sem is None: self._db_sem = asyncio.Semaphore(100) return self._db_sem @classmethod async def _is_db_alive(cls, db) -> bool: try: assert await db.ping() is True return True except AssertionError: return False @property async def jobs_db_alive(self) -> "Redis": for tries in range(5): jobs_db = await self.jobs_db db_alive = await self._is_db_alive(jobs_db) if db_alive: return jobs_db else: await asyncio.sleep(0.0001, 0.005) @property async def logs_db_alive(self) -> "Redis": for tries in range(5): logs_db = await self.logs_db db_alive = await self._is_db_alive(logs_db) if db_alive: return logs_db else: await asyncio.sleep(0.0001, 0.005) @property async def db_alive(self) -> bool: try: assert await self.jobs_db_alive assert await self.logs_db_alive return True except AssertionError: return False @property async def db_info(self) -> Dict: jobs_db = await self.jobs_db_alive logs_db = await self.logs_db_alive info = { "jobs": await jobs_db.info(), "logs": await logs_db.info(), } LOGGER.debug("Using batch-jobs redis-db: %s", info) return info # async def db_close(self): # await self.jobs_db.close() # await self.logs_db.close()
[docs] async def db_save(self): jobs_db = await self.jobs_db_alive logs_db = await self.logs_db_alive assert await jobs_db.bgsave() is True assert await logs_db.bgsave() is True
@classmethod async def _find_by_job_id(cls, job_id: str, jobs_db: "Redis") -> Dict: """ Private method to find any job matching the jobId :param job_id: a batch jobName :param jobs_db: an async Redis :return: the job data or None """ if job_id: job_json = await jobs_db.get(job_id) if job_json: return json.loads(job_json) @classmethod async def _find_by_job_name(cls, job_name: str, jobs_db: "Redis") -> List[Dict]: """ Private method to find any jobs matching the jobName :param job_name: a batch jobName :param jobs_db: an async Redis :return: a list of dictionaries containing job data """ jobs = [] if job_name: job_ids = await jobs_db.get(job_name) if job_ids: # This should be a list of batch job-ids for a job name job_ids = set(json.loads(job_ids)) for job_id in job_ids: job_json = await jobs_db.get(job_id) if job_json: job_dict = json.loads(job_json) jobs.append(job_dict) return jobs
[docs] async def all_job_ids(self) -> Set[str]: """ Collect all jobIds. """ job_ids = set() async for key in self.gen_job_ids(): job_ids.add(key) return job_ids
[docs] async def gen_job_ids(self) -> AsyncIterator[str]: """ Generate all jobIds. """ jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): yield key
[docs] async def all_jobs(self) -> List[AWSBatchJob]: """ Collect all jobs. Warning: this could exceed memory, try to use the :py:meth:`gen_all_jobs` wherever possible. """ jobs = [] async for j in self.gen_all_jobs(): jobs.append(j) return jobs
[docs] async def gen_all_jobs(self) -> AsyncIterator[AWSBatchJob]: """ Generate all jobs. """ jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): # The key is a jobId that conforms to UUID job_json = await jobs_db.get(key) if job_json: job_dict = json.loads(job_json) yield AWSBatchJob(**job_dict)
[docs] async def count_by_job_status(self) -> Counter: """ Count all jobs by jobStatus :return: a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once) """ counter = Counter() for job_status in AWSBatchJobStates: counter[job_status.name] = 0 jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): # The key is a jobId that conforms to UUID job_json = await jobs_db.get(key) if job_json: job_dict = json.loads(job_json) job = AWSBatchJob(**job_dict) counter[job.status] += 1 return counter
[docs] async def group_by_job_status(self) -> Dict[str, List[Tuple[str, str, str]]]: """ Group all jobId by jobStatus :return: a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g. .. code-block:: { status: [ (job.job_id, job.job_name, job.status), ] } """ groups = defaultdict(list) for job_status in AWSBatchJobStates: groups[job_status.name] = [] jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): # The key is a jobId that conforms to UUID job_json = await jobs_db.get(key) if job_json: job_dict = json.loads(job_json) job = AWSBatchJob(**job_dict) job_info = (job.job_id, job.job_name, job.status) groups[job.status].append(job_info) return groups
[docs] async def find_by_job_id(self, job_id: str) -> Optional[Dict]: """ Find one job by the jobId :param job_id: a batch jobId :return: the job data or None """ if job_id: jobs_db = await self.jobs_db_alive return await self._find_by_job_id(job_id=job_id, jobs_db=jobs_db)
[docs] async def find_by_job_name(self, job_name: str) -> List[Dict]: """ Find any jobs matching the jobName :param job_name: a batch jobName :return: a list of dictionaries containing job data """ if job_name: jobs_db = await self.jobs_db_alive return await self._find_by_job_name(job_name=job_name, jobs_db=jobs_db)
[docs] async def find_latest_job_name(self, job_name: str) -> Optional[AWSBatchJob]: """ Find the latest job matching the jobName, based on the "createdAt" time stamp. :param job_name: a batch jobName :return: the latest job record available """ jobs_saved = await self.find_by_job_name(job_name) if jobs_saved: db_jobs = [AWSBatchJob(**job_dict) for job_dict in jobs_saved] db_jobs = sorted(db_jobs, key=lambda j: j.created or j.submitted) db_job = db_jobs[-1] return db_job
[docs] async def find_by_job_status(self, job_states: List[str]) -> List[AWSBatchJob]: """ Find any jobs matching any jobStatus values :param job_states: a list of valid job status values :return: a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values) """ for status in job_states: assert status in AWSBatchJob.STATES jobs = [] jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): # The key is a jobId that conforms to UUID job_json = await jobs_db.get(key) if job_json: job_dict = json.loads(job_json) job = AWSBatchJob(**job_dict) if job.status in job_states: jobs.append(job) return jobs
[docs] async def remove_by_job_id(self, job_id: str) -> Optional[Dict]: """ Remove any job matching the jobId :param job_id: a batch jobId :return: a deleted document """ # TODO: use a transaction if job_id: jobs_db = await self.jobs_db_alive job_dict = await self._find_by_job_id(job_id=job_id, jobs_db=jobs_db) if job_dict: # First try to remove this job-id from the job-name job_name = job_dict["job_name"] job_name_doc = await jobs_db.get(job_name) if job_name_doc: # This should be a list of batch job-ids for a job name job_ids: List = json.loads(job_name_doc) if job_id in job_ids: job_ids.remove(job_id) if job_ids: job_name_doc = json.dumps(list(job_ids)) await jobs_db.set(job_name, job_name_doc) else: # Don't save an empty set, delete the job-name entirely await jobs_db.delete(job_name) await jobs_db.delete(job_id) return job_dict
[docs] async def remove_by_job_name(self, job_name: str) -> Set[str]: """ Remove any jobs matching the jobName :param job_name: a batch jobName :return: a set of deleted job-ids """ if job_name: jobs_db = await self.jobs_db_alive job_name_doc = await jobs_db.get(job_name) if job_name_doc: job_ids = set(json.loads(job_name_doc)) await jobs_db.delete(*job_ids) await jobs_db.delete(job_name) return job_ids
[docs] async def save_job(self, job: AWSBatchJob) -> Optional[str]: """ Insert or update a job (if it has a job_id) :param job: an AWSBatchJob :return: the jobId if the job is saved """ if job.job_id is None: LOGGER.error("FAIL to save_job without job_id") return # TODO: use a transaction jobs_db = await self.jobs_db_alive job_json = json.dumps(job.db_data) for tries in range(3): try: response = await jobs_db.set(job.job_id, job_json) LOGGER.debug(response) if response: LOGGER.info( "AWS batch-db (%s:%s) saved status: %s", job.job_name, job.job_id, job.status, ) # Update the job-id set for the job-name job_ids = [] job_name_doc = await jobs_db.get(job.job_name) if job_name_doc: job_ids = json.loads(job_name_doc) if sorted(job_ids) != sorted(set(job_ids)): LOGGER.warning("jobs-db has duplicates for %s", job.job_name) if job.job_id not in job_ids: job_ids.append(job.job_id) job_ids = json.dumps(job_ids) await jobs_db.set(job.job_name, job_ids) return job.job_id except Exception as err: LOGGER.error(err)
[docs] async def find_job_logs(self, job_id: str) -> Optional[Dict]: """ Find job logs by job_id :param job_id: str :return: job logs document """ if job_id: logs_db = await self.logs_db_alive job_logs = await logs_db.get(job_id) if job_logs: return json.loads(job_logs) LOGGER.error("FAIL to find_job_logs with job_id: %s", job_id)
[docs] async def save_job_logs(self, job: AWSBatchJob) -> List[int]: """ Insert or update job logs (if the job has a job_id) :param job: an AWSBatchJob :return: a List[tinydb.database.Document.doc_id] """ # TODO: update return data type and content if job.job_id: logs_db = await self.logs_db_alive return await logs_db.set(job.job_id, json.dumps(job.db_logs_data)) LOGGER.error("FAIL to save_job_logs")
[docs] async def find_jobs_to_run(self) -> List[AWSBatchJob]: """ Find all jobs that have not SUCCEEDED. """ jobs_outstanding = [] jobs_db = await self.jobs_db_alive async for key in jobs_db.scan_iter(): if valid_uuid4(key): job_dict = await self._find_by_job_id(job_id=key, jobs_db=jobs_db) job = AWSBatchJob(**job_dict) LOGGER.debug( "AWS batch-db (%s:%s) status: %s", job.job_name, job.job_id, job.status, ) if job.job_id and job.status == "SUCCEEDED": LOGGER.debug(job.job_description) continue jobs_outstanding.append(job) return jobs_outstanding
[docs] async def jobs_to_run(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned. Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use `job.reset()` before passing the jobs to the job-manager. .. note:: The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs. To recover jobs from the jobs_db, use - :py:meth:`AWSBatchDB.find_jobs_to_run()` - :py:meth:`AWSBatchDB.jobs_db.all()` """ # Avoid side-effects in this function: # - treat the job as a read-only object # - treat any jobs_db records as read-only objects # - if a job is not saved, don't save it to jobs_db jobs_outstanding = [] for job in jobs: # First check the job itself before checking the jobs_db if job.job_id and job.status == "SUCCEEDED": LOGGER.debug(job.job_description) continue # TODO: use a common jobs-db instance for all these db_job = await self.find_latest_job_name(job.job_name) if db_job: LOGGER.debug( "AWS batch-db (%s:%s) status: %s", db_job.job_name, db_job.job_id, db_job.status, ) if db_job.job_id and db_job.status == "SUCCEEDED": LOGGER.debug(db_job.job_description) continue jobs_outstanding.append(job) return jobs_outstanding
[docs] async def jobs_recovery(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records to recover job data. """ jobs_recovered = [] for job in jobs: db_job = await self.find_latest_job_name(job.job_name) if db_job: LOGGER.debug( "AWS batch-db (%s:%s) status: %s", db_job.job_name, db_job.job_id, db_job.status, ) jobs_recovered.append(db_job) else: jobs_recovered.append(job) return jobs_recovered
[docs]@dataclass class AioAWSBatchTinyDB(AioAWSBatchDB): """ AWS Batch job databases - TinyDB implementation The jobs and logs are kept in separate DBs so that performance on the jobs_db is not compromised by too much data from job logs. This makes it possible to use the jobs_db without capturing logs as well. .. seealso:: https://tinydb.readthedocs.io/en/latest/ The batch data is a `TinyDB`_ json file, e.g. .. code-block:: >>> import json >>> jobs_db_file = '/tmp/aio_batch_jobs.json' >>> logs_db_file = '/tmp/aio_batch_logs.json' >>> with open(jobs_db_file) as job_file: ... batch_data = json.load(job_file) ... >>> len(batch_data['aws-batch-jobs']) 5 >>> import tinydb >>> tinydb.TinyDB.DEFAULT_TABLE = "aws-batch-jobs" >>> tinydb.TinyDB.DEFAULT_TABLE_KWARGS = {"cache_size": 0} >>> # the jobs and logs are kept in separate DBs so >>> # that performance on the jobs_db is not compromised >>> # by too much data from job logs. >>> jobs_db = tinydb.TinyDB(jobs_db_file) >>> logs_db = tinydb.TinyDB(logs_db_file) """ #: a file used for :py:class::`TinyDB(jobs_db_file)` jobs_db_file: str = "/tmp/aws_batch_jobs.json" #: a file used for :py:class::`TinyDB(logs_db_file)` logs_db_file: str = "/tmp/aws_batch_logs.json" def __post_init__(self): tinydb.TinyDB.DEFAULT_TABLE = "aws-batch-jobs" tinydb.TinyDB.DEFAULT_TABLE_KWARGS = {"cache_size": 0} # the jobs and logs are kept in separate DBs so # that performance on the jobs_db is not compromised # by too much data from job logs. self.jobs_db = tinydb.TinyDB(self.jobs_db_file) self.logs_db = tinydb.TinyDB(self.logs_db_file) LOGGER.info("Using jobs-db file: %s", self.jobs_db_file) LOGGER.info("Using logs-db file: %s", self.logs_db_file) # Lazy init for any asyncio instances self._db_sem = None
[docs] @classmethod def get_paged_batch_db(cls, db_path: Path, offset: int, limit: int): db_page = f"{offset:06d}_{offset + limit:06d}" jobs_db_file = str(db_path / f"aio_batch_jobs_{db_page}.json") logs_db_file = str(db_path / f"aio_batch_logs_{db_page}.json") return cls(jobs_db_file=jobs_db_file, logs_db_file=logs_db_file)
@property def db_semaphore(self) -> asyncio.Semaphore: """A semaphore to limit requests to the db""" if self._db_sem is None: self._db_sem = asyncio.Semaphore() return self._db_sem
[docs] async def all_jobs(self) -> List[AWSBatchJob]: """ Collect all jobs. Warning: this could exceed memory, try to use the :py:meth:`gen_all_jobs` wherever possible. """ jobs = [] async for j in self.gen_all_jobs(): jobs.append(j) return jobs
[docs] async def gen_all_jobs(self) -> AsyncIterator[AWSBatchJob]: """ Generate all jobs. """ async with self.db_semaphore: for job_doc in self.jobs_db.all(): job_id = job_doc.get("job_id", "") if valid_uuid4(job_id): yield AWSBatchJob(**job_doc)
[docs] async def all_job_ids(self) -> Set[str]: """ Collect all jobIds. """ job_ids = set() async for job_id in self.gen_job_ids(): job_ids.add(job_id) return job_ids
[docs] async def gen_job_ids(self) -> AsyncIterator[str]: """ Generate all jobIds. """ async with self.db_semaphore: for job_doc in self.jobs_db.all(): job_id = job_doc.get("job_id", "") if valid_uuid4(job_id): yield job_id
[docs] async def count_by_job_status(self) -> Counter: """ Count all jobs by jobStatus :return: a Counter of jobs by job status (could contain multiple entries for the same jobName, if it is run more than once) """ counter = Counter() for job_status in AWSBatchJobStates: counter[job_status.name] = 0 async with self.db_semaphore: for job_doc in self.jobs_db.all(): job = AWSBatchJob(**job_doc) counter[job.status] += 1 return counter
[docs] async def group_by_job_status(self) -> Dict[str, List[Tuple[str, str, str]]]: """ Group all jobId by jobStatus :return: a dictionary of job info by job status (could contain multiple entries with the same jobName if it is run multiple times); the dictionary values contain a list of job information tuples, e.g. .. code-block:: { status: [ (job.job_id, job.job_name, job.status), ] } """ groups = defaultdict(list) for job_status in AWSBatchJobStates: groups[job_status.name] = [] async with self.db_semaphore: for job_doc in self.jobs_db.all(): job = AWSBatchJob(**job_doc) job_info = (job.job_id, job.job_name, job.status) groups[job.status].append(job_info) return groups
[docs] async def find_by_job_id(self, job_id: str) -> Optional[tinydb.database.Document]: """ Find one job by the jobId :param job_id: a batch jobId :return: the :py:meth:`AWSBatchJob.job_data` or None """ if job_id: async with self.db_semaphore: job_query = tinydb.Query() return self.jobs_db.get(job_query.job_id == job_id)
[docs] async def find_by_job_name(self, job_name: str) -> List[tinydb.database.Document]: """ Find any jobs matching the jobName :param job_name: a batch jobName :return: a list of documents containing :py:meth:`AWSBatchJob.job_data` """ if job_name: async with self.db_semaphore: job_query = tinydb.Query() return self.jobs_db.search(job_query.job_name == job_name)
[docs] async def find_latest_job_name(self, job_name: str) -> Optional[AWSBatchJob]: """ Find the latest job matching the jobName, based on the "createdAt" time stamp. :param job_name: a batch jobName :return: the latest job record available """ jobs_saved = await self.find_by_job_name(job_name) if jobs_saved: db_jobs = [AWSBatchJob(**job_doc) for job_doc in jobs_saved] db_jobs = sorted(db_jobs, key=lambda j: j.created or j.submitted) db_job = db_jobs[-1] return db_job
[docs] async def find_by_job_status(self, job_states: List[str]) -> List[AWSBatchJob]: """ Find any jobs matching any jobStatus values :param job_states: a list of valid job status values :return: a list of AWSBatchJob (could contain multiple entries with the same jobName if it is run multiple times with any jobStatus in the jobStatus values) """ for status in job_states: assert status in AWSBatchJob.STATES jobs = [] async with self.db_semaphore: for job_doc in self.jobs_db.all(): job = AWSBatchJob(**job_doc) if job.status in job_states: jobs.append(job) return jobs
[docs] async def remove_by_job_id(self, job_id: str) -> Optional[tinydb.database.Document]: """ Remove any job matching the jobId :param job_id: a batch jobId :return: a deleted document """ if job_id: doc = await self.find_by_job_id(job_id) if doc: removed_ids = self.jobs_db.remove(doc_ids=[doc.doc_id]) if removed_ids: return doc
[docs] async def remove_by_job_name(self, job_name: str) -> Set[str]: """ Remove any jobs matching the jobName :param job_name: a batch jobName :return: a set of deleted job-ids """ if job_name: jobs_found = await self.find_by_job_name(job_name) if jobs_found: jobs_by_doc_id = {doc.doc_id: doc["job_id"] for doc in jobs_found} doc_ids = list(jobs_by_doc_id.keys()) async with self.db_semaphore: removed_ids = self.jobs_db.remove(doc_ids=doc_ids) return set([jobs_by_doc_id[doc_id] for doc_id in removed_ids])
[docs] async def save_job(self, job: AWSBatchJob) -> Optional[str]: """ Insert or update a job (if it has a job_id) :param job: an AWSBatchJob :return: the jobId if the job is saved """ if job.job_id: async with self.db_semaphore: job_query = tinydb.Query() doc_ids = self.jobs_db.upsert( job.db_data, job_query.job_id == job.job_id ) if doc_ids: return job.job_id else: LOGGER.error("FAIL to save_job without job_id")
[docs] async def find_job_logs(self, job_id: str) -> Optional[tinydb.database.Document]: """ Find job logs by job_id :param job_id: str :return: a tinydb.database.Document with job logs """ if job_id: async with self.db_semaphore: log_query = tinydb.Query() return self.logs_db.get(log_query.job_id == job_id) else: LOGGER.error("FAIL to find_job_logs without job_id")
[docs] async def save_job_logs(self, job: AWSBatchJob) -> Optional[str]: """ Insert or update job logs (if the job has a job_id) :param job: an AWSBatchJob :return: a job.job_id if the logs are saved """ if job.job_id: async with self.db_semaphore: log_query = tinydb.Query() doc_ids = self.logs_db.upsert( job.db_logs_data, log_query.job_id == job.job_id ) if doc_ids: return job.job_id else: LOGGER.error("FAIL to save_job_logs without job_id")
[docs] async def find_jobs_to_run(self) -> List[AWSBatchJob]: """ Find all jobs that have not SUCCEEDED. Note that any jobs handled by the job-manager will not re-run if they have a job.job_id, those jobs will be monitored until complete. """ async with self.db_semaphore: jobs = [AWSBatchJob(**job_doc) for job_doc in self.jobs_db.all()] jobs_outstanding = [] for job in jobs: LOGGER.info( "AWS batch-db (%s:%s) status: %s", job.job_name, job.job_id, job.status, ) if job.job_id and job.status == "SUCCEEDED": LOGGER.debug(job.job_description) continue jobs_outstanding.append(job) return jobs_outstanding
[docs] async def jobs_to_run(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records and check the job status on the latest submission of any job matching the job-name. For any job that has a matching job-name in the jobs_db, check whether the job has already run and SUCCEEDED. Any jobs that have already run and SUCCEEDED are not returned. Return all jobs that have not run yet or have not SUCCEEDED. Note that any jobs subsequently input to the job-manager will not re-run if they already have a job.job_id, those jobs will be monitored until complete. To re-run jobs that are returned from this filter function, first use `job.reset()` before passing the jobs to the job-manager. .. note:: The input jobs are not modified in any way in this function, i.e. there are no updates from the jobs_db applied to the input jobs. To recover jobs from the jobs_db, use - :py:meth:`AWSBatchDB.find_jobs_to_run()` - :py:meth:`AWSBatchDB.jobs_db.all()` """ jobs_outstanding = [] for job in jobs: # Avoid side-effects in this function: # - treat the job as a read-only object # - treat any jobs_db records as read-only objects # - if a job is not saved, don't save it to jobs_db # First check the job itself before checking the jobs_db if job.job_id and job.status == "SUCCEEDED": LOGGER.debug(job.job_description) continue db_job = await self.find_latest_job_name(job.job_name) if db_job: LOGGER.info( "AWS batch-db (%s:%s) status: %s", db_job.job_name, db_job.job_id, db_job.status, ) if db_job.job_id and db_job.status == "SUCCEEDED": LOGGER.debug(db_job.job_description) continue jobs_outstanding.append(job) return jobs_outstanding
[docs] async def jobs_recovery(self, jobs: List[AWSBatchJob]) -> List[AWSBatchJob]: """ Use the job.job_name to find any jobs_db records to recover job data. """ jobs_recovered = [] for job in jobs: db_job = await self.find_latest_job_name(job.job_name) if db_job: LOGGER.info( "AWS batch-db (%s:%s) status: %s", db_job.job_name, db_job.job_id, db_job.status, ) jobs_recovered.append(db_job) else: jobs_recovered.append(job) return jobs_recovered