E2E ML Pipeline with Model Registry#
Build a SageMaker Pipeline that processes data, trains a model, and registers it to the Model Registry
from sagemaker.train import ModelTrainer
from sagemaker.train.configs import InputData
from sagemaker.core.processing import (
ScriptProcessor,
)
from sagemaker.core.shapes import (
ProcessingInput,
ProcessingS3Input,
ProcessingOutput,
ProcessingS3Output
)
from sagemaker.serve.model_builder import ModelBuilder
from sagemaker.core.workflow.parameters import (
ParameterInteger,
ParameterString,
)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.mlops.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.mlops.workflow.model_step import ModelStep
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.core import image_uris
from sagemaker.train.configs import Compute
# Create the SageMaker Session
sagemaker_session = Session()
pipeline_session = PipelineSession()
sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
prefix = "pipeline-v3"
account_id = sagemaker_session.account_id()
# Define variables and parameters needed for the Pipeline steps
role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "v3-pipeline-example"
s3_prefix = "v3-test-pipeline"
default_bucket_prefix = sagemaker_session.default_bucket_prefix
# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
s3_prefix = f"{default_bucket_prefix}/{s3_prefix}"
base_job_prefix = f"{default_bucket_prefix}/{base_job_prefix}"
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
name="InputDataUrl",
default_value=f"s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv",
)
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
hyperparameter_max_depth = ParameterString(name="MaxDepth", default_value="5")
# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
!mkdir -p code
%%writefile code/preprocess.py
"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile
import boto3
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
"sex",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
]
label_column = "rings"
feature_columns_dtype = {
"sex": str,
"length": np.float64,
"diameter": np.float64,
"height": np.float64,
"whole_weight": np.float64,
"shucked_weight": np.float64,
"viscera_weight": np.float64,
"shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}
def merge_two_dicts(x, y):
"""Merges two dicts, returning a new copy."""
z = x.copy()
z.update(y)
return z
if __name__ == "__main__":
logger.debug("Starting preprocessing.")
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, required=True)
args = parser.parse_args()
base_dir = "/opt/ml/processing"
pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
input_data = args.input_data
bucket = input_data.split("/")[2]
key = "/".join(input_data.split("/")[3:])
logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
fn = f"{base_dir}/data/abalone-dataset.csv"
s3 = boto3.resource("s3")
s3.Bucket(bucket).download_file(key, fn)
logger.debug("Reading downloaded data.")
df = pd.read_csv(
fn,
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
)
os.unlink(fn)
logger.debug("Defining transformers.")
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_features = ["sex"]
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
logger.info("Applying transforms.")
y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
np.random.shuffle(X)
train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])
logger.info("Writing out datasets to %s.", base_dir)
pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
pd.DataFrame(validation).to_csv(
f"{base_dir}/validation/validation.csv", header=False, index=False
)
pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
# Using modern ScriptProcessor class instead of SKLearnProcessor
sklearn_pipeline_processor = ScriptProcessor(
image_uri=image_uris.retrieve(
framework="sklearn",
region=region,
version="1.2-1",
py_version="py3",
instance_type="ml.m5.xlarge",
),
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}-sklearn-preprocess-job",
sagemaker_session=pipeline_session,
role=role,
)
pipe_processor_args = sklearn_pipeline_processor.run(
inputs=[
ProcessingInput(
input_name="input-1",
s3_input=ProcessingS3Input(
s3_uri=input_data,
local_path="/opt/ml/processing/input",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="ShardedByS3Key",
)
)
],
outputs=[
ProcessingOutput(
output_name="train",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/train",
local_path="/opt/ml/processing/train",
s3_upload_mode="EndOfJob"
)
),
ProcessingOutput(
output_name="validation",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/validation",
local_path="/opt/ml/processing/validation",
s3_upload_mode="EndOfJob"
)
),
ProcessingOutput(
output_name="test",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/test",
local_path="/opt/ml/processing/test",
s3_upload_mode="EndOfJob"
)
),
],
code="code/preprocess.py",
arguments=["--input-data", input_data],
)
step_process = ProcessingStep(
name="PreprocessAbaloneData",
step_args=pipe_processor_args,
cache_config=cache_config,
)
Training Step#
# Define the output path for the model artifacts from the training job
model_path = f"s3://{default_bucket}/{base_job_prefix}/V3PipelineTrain"
image_uri = image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
# Using modern ModelTrainer class instead of Estimator
model_trainer = ModelTrainer(
training_image=image_uri,
compute=Compute(
instance_type=training_instance_type,
instance_count=1,
),
base_job_name=f"{base_job_prefix}-xgboost-train",
sagemaker_session=pipeline_session,
role=role,
hyperparameters={
"objective": "reg:linear",
"num_round": 50,
"max_depth": hyperparameter_max_depth,
"eta": 0.2,
"gamma": 4,
"min_child_weight": 6,
"subsample": 0.7,
"silent": 0,
},
input_data_config=[
InputData(
channel_name="train",
data_source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
content_type="text/csv"
),
InputData(
channel_name="validation",
data_source=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
content_type="text/csv"
),
],
)
train_args = model_trainer.train()
step_train = TrainingStep(
name="TrainModelTestStep",
step_args=train_args,
cache_config=cache_config,
)
Create model step#
# Create Model using ModelBuilder
model_builder = ModelBuilder(
s3_model_data_url=step_train.properties.ModelArtifacts.S3ModelArtifacts,
image_uri=image_uri,
sagemaker_session=pipeline_session,
role_arn=role,
)
step_create_model = ModelStep(
name="CreateModel",
step_args=model_builder.build()
)
Register Step#
step_register_model = ModelStep(
name="RegisterModel",
step_args=model_builder.register(
model_package_group_name="my-model-package-group",
content_types=["application/json"],
response_types=["application/json"],
inference_instances=["ml.m5.xlarge"],
approval_status="Approved"
)
)
pipeline = Pipeline(
name="pipeline-v3",
parameters=[
processing_instance_count,
training_instance_type,
input_data,
model_approval_status,
hyperparameter_max_depth
],
steps=[step_process, step_train, step_create_model, step_register_model],
sagemaker_session=pipeline_session,
)
pipeline.definition()
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()