"""Upload model artifacts to S3"""
from __future__ import absolute_import
import logging
import os
import tempfile
import botocore
import boto3
import tqdm
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.s3.utils import s3_path_join
from sagemaker.core.s3 import S3Uploader
from sagemaker.core.common_utils import create_tar_file
logger = logging.getLogger(__name__)
# Minimum size required for multi-part uploads
BUF_SIZE = 5 * 1024 * 1024
def _get_dir_size(path):
"""Calculate the size of a directory"""
total = 0
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += _get_dir_size(entry.path)
return total
[docs]
class Uploader(object):
"""Uploader class that handles uploading data to S3 and display progress bar"""
def __init__(self) -> None:
self.total_left = None
self.pbar = None
[docs]
def observe(self, bytes_amount):
"""Placeholder docstring"""
self.total_left -= bytes_amount
self.pbar.update(bytes_amount)
[docs]
def upload(
self,
model_dir,
total_size: int,
credentials: botocore.credentials.Credentials,
region_name: str,
bucket: str,
key: str,
):
"""Compress and upload the model tar object to S3"""
self.total_left = total_size
with tqdm.tqdm(
total=total_size, desc="Uploading model artifacts", unit="bytes", ncols=100
) as self.pbar:
files = [os.path.join(model_dir, name) for name in os.listdir(model_dir)]
temp = tempfile.mkdtemp()
tar_file = create_tar_file(files, os.path.join(temp, "model.tar.gz"))
s3 = boto3.session.Session(
region_name=region_name,
aws_access_key_id=credentials.access_key,
aws_secret_access_key=credentials.secret_key,
aws_session_token=credentials.token,
).client("s3")
s3.upload_file(os.path.join(temp, "model.tar.gz"), bucket, key, Callback=self.observe)
os.remove(tar_file)
self.pbar.update(self.total_left)
self.pbar.close()
self.pbar = None
[docs]
def upload_uncompressed(
self,
model_dir: str,
sagemaker_session: Session,
bucket: str,
key_prefix: str,
total_size: int,
):
"""Upload uncompressed model artifacts to S3"""
self.total_left = total_size
with tqdm.tqdm(
total=total_size, desc="Uploading model artifacts", unit="bytes", ncols=100
) as self.pbar:
S3Uploader.upload(
local_path=model_dir,
desired_s3_uri=s3_path_join("s3://", bucket, key_prefix),
sagemaker_session=sagemaker_session,
callback=self.observe,
)
self.pbar.update(self.total_left)
[docs]
def upload(sagemaker_session: Session, model_dir: str, bucket: str, key_prefix: str) -> str:
"""Wrapper function of method upload"""
key = key_prefix + "/serve.tar.gz"
uploader = Uploader()
uploader.upload(
model_dir,
_get_dir_size(model_dir),
sagemaker_session.boto_session.get_credentials(),
sagemaker_session.boto_session.region_name,
bucket,
key,
)
return s3_path_join("s3://", bucket, key)
[docs]
def upload_uncompressed(
sagemaker_session: Session, model_dir: str, bucket: str, key_prefix: str
) -> str:
"""Wrapper function of method upload_uncompressed"""
uploader = Uploader()
uploader.upload_uncompressed(
model_dir, sagemaker_session, bucket, key_prefix, _get_dir_size(model_dir)
)
return s3_path_join("s3://", bucket, key_prefix, with_end_slash=True)