"""Module for the MultiModel Local and Remote servers"""
from __future__ import absolute_import
import requests
import logging
import platform
from pathlib import Path
from sagemaker.core.helper.session_helper import Session
from sagemaker.core import fw_utils
from sagemaker.core.s3.utils import determine_bucket_and_prefix, parse_s3_url, s3_path_join
from sagemaker.core.s3 import S3Uploader
from sagemaker.core.local.local_session import get_docker_host
from sagemaker.core.common_utils import _is_s3_uri
MODE_DIR_BINDING = "/opt/ml/model/"
_DEFAULT_ENV_VARS = {}
logger = logging.getLogger(__name__)
[docs]
class LocalMultiModelServer:
"""Local Multi Model server instance"""
def _start_serving(
self,
client: object,
image: str,
model_path: str,
secret_key: str,
env_vars: dict,
):
"""Initializes the start of the server"""
env = {
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code",
"SAGEMAKER_PROGRAM": "inference.py",
"LOCAL_PYTHON": platform.python_version(),
}
if env_vars:
env_vars.update(env)
else:
env_vars = env
self.container = client.containers.run(
image,
"serve",
# network_mode="host",
ports={"8080/tcp": 8080},
detach=True,
auto_remove=True,
volumes={
Path(model_path).joinpath("code"): {
"bind": MODE_DIR_BINDING,
"mode": "rw",
},
},
environment=env_vars,
)
def _invoke_multi_model_server_serving(self, request: object, content_type: str, accept: str):
"""Invokes MMS server by hitting the docker host"""
try:
response = requests.post(
f"http://{get_docker_host()}:8080/invocations",
data=request,
headers={"Content-Type": content_type, "Accept": accept},
timeout=600,
)
response.raise_for_status()
return response.content
except Exception as e:
raise Exception("Unable to send request to the local container server") from e
[docs]
class SageMakerMultiModelServer:
"""Sagemaker endpoint Multi Model Server"""
def _upload_server_artifacts(
self,
model_path: str,
secret_key: str,
sagemaker_session: Session,
s3_model_data_url: str = None,
image: str = None,
env_vars: dict = None,
should_upload_artifacts: bool = False,
):
model_data_url = None
if _is_s3_uri(model_path):
model_data_url = model_path
elif should_upload_artifacts:
if s3_model_data_url:
bucket, key_prefix = parse_s3_url(url=s3_model_data_url)
else:
bucket, key_prefix = None, None
code_key_prefix = fw_utils.model_code_key_prefix(key_prefix, None, image)
bucket, code_key_prefix = determine_bucket_and_prefix(
bucket=bucket, key_prefix=code_key_prefix, sagemaker_session=sagemaker_session
)
code_dir = Path(model_path).joinpath("code")
s3_location = s3_path_join("s3://", bucket, code_key_prefix, "code")
logger.debug("Uploading Multi Model Server Resources uncompressed to: %s", s3_location)
model_data_url = S3Uploader.upload(
str(code_dir),
s3_location,
None,
sagemaker_session,
)
model_data = (
{
"S3DataSource": {
"CompressionType": "None",
"S3DataType": "S3Prefix",
"S3Uri": model_data_url + "/",
}
}
if model_data_url
else None
)
if secret_key:
env_vars = {
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code",
"SAGEMAKER_PROGRAM": "inference.py",
"SAGEMAKER_REGION": sagemaker_session.boto_region_name,
"SAGEMAKER_CONTAINER_LOG_LEVEL": "10",
"LOCAL_PYTHON": platform.python_version(),
}
return model_data, _update_env_vars(env_vars)
def _update_env_vars(env_vars: dict) -> dict:
"""Placeholder docstring"""
updated_env_vars = {}
updated_env_vars.update(_DEFAULT_ENV_VARS)
if env_vars:
updated_env_vars.update(env_vars)
return updated_env_vars