Source code for aio_aws.s3_io

# Copyright 2019-2021 Darren Weber
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
S3 IO
=====

These functions provide blocking serialization to files
and s3 for GeoJSON, GeoJSONSeq, JSON, and YAML.

"""

import concurrent.futures
import json
import os
import tempfile
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import boto3
import yaml
from botocore.client import BaseClient
from botocore.config import Config
from botocore.exceptions import ClientError

from aio_aws.logger import get_logger
from aio_aws.s3_uri import S3URI
from aio_aws.s3_uri import S3Info
from aio_aws.utils import response_success

LOGGER = get_logger(__name__)


[docs]def s3_io_client() -> BaseClient: """ This creates a botocore.client.BaseClient that uses urllib3, which should be thread-safe, with a default connection pool. This uses a botocore.config.Config with 3 retries using a standard back off method :return: a botocore.client.BaseClient for s3 """ config = Config(retries={"max_attempts": 3, "mode": "standard"}) return boto3.client("s3", config=config)
[docs]def s3_file_info(s3_uri: Union[S3URI, str], s3_client: BaseClient = None) -> S3Info: """ Collect data from an S3 HEAD request for an S3URI :param s3_uri: a fully qualified S3 URI for the s3 object to read :param s3_client: an optional botocore.client.BaseClient for s3 :return: an S3Info object with HEAD data on success; on failure the S3Info object has no HEAD data """ if s3_client is None: s3_client = s3_io_client() if isinstance(s3_uri, str): s3_uri = S3URI(s3_uri) s3_info = S3Info(s3_uri=s3_uri) try: s3_head = s3_client.head_object(Bucket=s3_uri.bucket, Key=s3_uri.key) if response_success(s3_head): # LastModified is a datetime.datetime s3_info.last_modified = s3_head["LastModified"] s3_info.s3_size = int(s3_head["ContentLength"]) LOGGER.debug("Success S3URI info: %s", s3_uri) except ClientError as err: LOGGER.debug("Failed S3URI info: %s", s3_uri) LOGGER.debug(err) return s3_info
[docs]def s3_files_info( s3_uris: List[Union[S3URI, str]], s3_client: BaseClient = None ) -> List[S3Info]: if s3_client is None: s3_client = s3_io_client() s3_files = [] with concurrent.futures.ThreadPoolExecutor() as executor: s3_futures = {} for s3_uri in s3_uris: s3_future = executor.submit(s3_file_info, s3_uri, s3_client) s3_futures[s3_future] = s3_uri for future in concurrent.futures.as_completed(s3_futures): try: s3_info = future.result() s3_files.append(s3_info) except Exception as err: LOGGER.error(err) pass return s3_files
[docs]def get_s3_content(s3_uri: str, s3_client: BaseClient = None): """ Read an s3 URI :param s3_uri: a fully qualified S3 URI for the s3 object to read :param s3_client: an optional botocore.client.BaseClient for s3 :return: the data from the s3 object """ if s3_client is None: s3_client = s3_io_client() try: s3_uri = S3URI(s3_uri) LOGGER.info("Read s3-uri: %s", s3_uri.s3_uri) content_object = s3_client.get_object(Bucket=s3_uri.bucket, Key=s3_uri.key) file_content = content_object["Body"].read().decode("utf-8") return file_content except ClientError as err: LOGGER.error("Failed S3 PUT to: %s", s3_uri) LOGGER.error(err)
[docs]def put_s3_content( data_file: str, s3_uri: str, s3_client: BaseClient = None ) -> Optional[str]: """ Write a file to an s3 URI :param data_file: a data file :param s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: the s3 URI on success """ if s3_client is None: s3_client = s3_io_client() s3_uri = S3URI(s3_uri) try: with open(data_file, "rb") as fd: response = s3_client.put_object( Bucket=s3_uri.bucket, Key=s3_uri.key, Body=fd ) success = response_success(response) if success: # Use a boto3 waiter to confirm it worked exists_waiter = s3_client.get_waiter("object_exists") exists_waiter.wait(Bucket=s3_uri.bucket, Key=s3_uri.key) return str(s3_uri) except ClientError as err: LOGGER.error("Failed S3 PUT to: %s", s3_uri) LOGGER.error(err)
[docs]def json_s3_dump( json_data: Any, s3_uri: str, s3_client: BaseClient = None ) -> Optional[str]: """ Write JSON to an s3 URI :param json_data: an object to json.dump :param s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: the s3 URI on success """ s3_uri = S3URI(s3_uri) success = False tmp_file = None try: with tempfile.NamedTemporaryFile(delete=False) as o_file: tmp_file = o_file.name with open(o_file.name, "w") as fd: json.dump(obj=json_data, fp=fd) o_file.flush() s3_obj = put_s3_content( data_file=tmp_file, s3_uri=str(s3_uri), s3_client=s3_client ) if s3_obj: success = True finally: if tmp_file: os.unlink(tmp_file) if success: LOGGER.info("Saved S3URI: %s", str(s3_uri)) return str(s3_uri) else: LOGGER.error("Failed to save S3URI: %s", str(s3_uri))
[docs]def json_s3_load(s3_uri: str, s3_client: BaseClient = None) -> Any: """ Load JSON from an s3 URI :param s3_uri: a fully qualified S3 URI for the s3 object to read :param s3_client: an optional botocore.client.BaseClient for s3 :return: data from the json load """ file_content = get_s3_content(s3_uri, s3_client=s3_client) json_data = json.loads(file_content) return json_data
[docs]def geojson_s3_load(s3_uri: str, s3_client: BaseClient = None) -> Dict: """ Read GeoJSON data from an s3 object :param s3_uri: a fully qualified S3 URI for the s3 object to read :param s3_client: an optional botocore.client.BaseClient for s3 :return: geojson data """ file_content = get_s3_content(s3_uri, s3_client=s3_client) geojson = json.loads(file_content) return geojson
[docs]def geojson_s3_dump( geojson_data: Any, s3_uri: str, s3_client: BaseClient = None ) -> Optional[str]: """ Write GeoJSON to an s3 URI :param geojson_data: an object to json.dump :param s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: the s3 URI on success """ return json_s3_dump(json_data=geojson_data, s3_uri=s3_uri, s3_client=s3_client)
[docs]def geojsons_s3_load(s3_uri: str, s3_client: BaseClient = None) -> List[Dict]: """ Read GeoJSON Text Sequence data from an s3 object :param s3_uri: a fully qualified S3 URI for the s3 object to read :param s3_client: an optional botocore.client.BaseClient for s3 :return: geojson features .. seealso:: - https://tools.ietf.org/html/rfc8142 """ file_content = get_s3_content(s3_uri, s3_client=s3_client) geojsons = file_content.splitlines() # some of the geojsons lines could be empty features = [] while geojsons: feature = geojsons.pop(0).strip() if feature: features.append(json.loads(feature)) return features
[docs]def geojsons_s3_dump( geojson_features: List[Dict], s3uri: str, s3_client: BaseClient = None ) -> Optional[str]: """ Write GeoJSON Text Sequence files to an s3 URI [GeoJSON Text Sequences](https://tools.ietf.org/html/rfc8142) are lines of geojson features that are designed for streaming operations on large datasets. These files can be loaded by geopandas, using fiona `driver="GeoJSONSeq"`, which can be auto-detected. For example: .. code-block:: import geopandas as gpd s3_uri = "s3://your-bucket/prefix/input.geojsons" gdf = gpd.read_file(s3_uri) :param geojson_features: a list of geojson features; from any feature collection, this is geojson_collection["features"] :param s3uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: the s3 URI on success """ if s3_client is None: s3_client = s3_io_client() s3_uri = S3URI(s3uri) success = False tmp_file = None try: with tempfile.NamedTemporaryFile(delete=False) as o_file: tmp_file = o_file.name with open(o_file.name, "w") as fd: for feature in geojson_features: json.dump(feature, fd) fd.write("\n") o_file.flush() s3_obj = put_s3_content( data_file=tmp_file, s3_uri=str(s3_uri), s3_client=s3_client ) if s3_obj: success = True finally: if tmp_file: os.unlink(tmp_file) if success: LOGGER.info("Saved GeoJSONSeq to %s", str(s3_uri)) return str(s3_uri) else: LOGGER.error("Failed to save GeoJSONSeq to %s", s3_uri)
[docs]def geojsons_dump( geojson_features: List[Dict], geojsons_file: Union[Path, str] ) -> Optional[Union[Path, str]]: """ :param geojson_features: a list of geojson features; from any feature collection, this is geojson_collection["features"] :param geojsons_file: a file path to write :return: if the dump succeeds, return the geojsons_file, or None """ with open(geojsons_file, "w") as dst: for feature in geojson_features: json.dump(feature, dst) dst.write("\n") geojsons_path = Path(geojsons_file) if geojsons_path.is_file() and geojsons_path.stat().st_size > 0: LOGGER.info("Saved GeoJSONSeq to %s", geojsons_file) return geojsons_file LOGGER.error("Failed to save GeoJSONSeq to %s", geojsons_file)
[docs]def yaml_s3_dump( yaml_data: Any, s3_uri: str, s3_client: BaseClient = None ) -> Optional[str]: """ Write YAML to an s3 URI :param yaml_data: an object to yaml.dump :param s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: the s3 URI on success """ s3_uri = S3URI(s3_uri) success = False tmp_file = None try: with tempfile.NamedTemporaryFile(delete=False) as o_file: tmp_file = o_file.name with open(o_file.name, "w") as fd: yaml.safe_dump(yaml_data, fd) o_file.flush() s3_obj = put_s3_content( data_file=tmp_file, s3_uri=str(s3_uri), s3_client=s3_client ) if s3_obj: success = True finally: if tmp_file: os.unlink(tmp_file) if success: LOGGER.info("Saved S3URI: %s", str(s3_uri)) return str(s3_uri) else: LOGGER.error("Failed to save S3URI: %s", str(s3_uri))
[docs]def yaml_s3_load(s3_uri: str, s3_client: BaseClient = None) -> Any: """ Load YAML from an s3 URI :param s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: data from the yaml load """ file_content = get_s3_content(s3_uri, s3_client=s3_client) yaml_data = yaml.safe_load(file_content) return yaml_data