E2E ML Pipeline with Model Registry

E2E ML Pipeline with Model Registry#

Build a SageMaker Pipeline that processes data, trains a model, and registers it to the Model Registry

from sagemaker.train import ModelTrainer
from sagemaker.train.configs import InputData
from sagemaker.core.processing import (
    ScriptProcessor,
)
from sagemaker.core.shapes import (
    ProcessingInput,
    ProcessingS3Input,
    ProcessingOutput,
    ProcessingS3Output
)
from sagemaker.serve.model_builder import ModelBuilder

from sagemaker.core.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.mlops.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.mlops.workflow.model_step import ModelStep
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.core import image_uris
from sagemaker.train.configs import Compute
# Create the SageMaker Session

sagemaker_session = Session()
pipeline_session = PipelineSession()
sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
prefix = "pipeline-v3"

account_id = sagemaker_session.account_id()
# Define variables and parameters needed for the Pipeline steps

role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "v3-pipeline-example"
s3_prefix = "v3-test-pipeline"
default_bucket_prefix = sagemaker_session.default_bucket_prefix

# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
    s3_prefix = f"{default_bucket_prefix}/{s3_prefix}"
    base_job_prefix = f"{default_bucket_prefix}/{base_job_prefix}"

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv",
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
hyperparameter_max_depth = ParameterString(name="MaxDepth", default_value="5")

# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
!mkdir -p code
%%writefile code/preprocess.py

"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/abalone-dataset.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.debug("Reading downloaded data.")
    df = pd.read_csv(
        fn,
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    os.unlink(fn)

    logger.debug("Defining transformers.")
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    logger.info("Applying transforms.")
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.info("Writing out datasets to %s.", base_dir)
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
# Using modern ScriptProcessor class instead of SKLearnProcessor
sklearn_pipeline_processor = ScriptProcessor(
    image_uri=image_uris.retrieve(
        framework="sklearn",
        region=region,
        version="1.2-1",
        py_version="py3",
        instance_type="ml.m5.xlarge",
    ),
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}-sklearn-preprocess-job",
    sagemaker_session=pipeline_session,
    role=role,
)

pipe_processor_args = sklearn_pipeline_processor.run(
    inputs=[
        ProcessingInput(
            input_name="input-1",
            s3_input=ProcessingS3Input(
                s3_uri=input_data,
                local_path="/opt/ml/processing/input",
                s3_data_type="S3Prefix",
                s3_input_mode="File",
                s3_data_distribution_type="ShardedByS3Key",
            )
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            s3_output=ProcessingS3Output(
                s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/train",
                local_path="/opt/ml/processing/train",
                s3_upload_mode="EndOfJob"
            )
        ),
        ProcessingOutput(
            output_name="validation",
            s3_output=ProcessingS3Output(
                s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/validation",
                local_path="/opt/ml/processing/validation",
                s3_upload_mode="EndOfJob"
            )
        ),
        ProcessingOutput(
            output_name="test",
            s3_output=ProcessingS3Output(
                s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/test",
                local_path="/opt/ml/processing/test",
                s3_upload_mode="EndOfJob"
            )
        ),
    ],
    code="code/preprocess.py",
    arguments=["--input-data", input_data],
)

step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=pipe_processor_args,
    cache_config=cache_config,
)

Training Step#

# Define the output path for the model artifacts from the training job
model_path = f"s3://{default_bucket}/{base_job_prefix}/V3PipelineTrain"

image_uri = image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

# Using modern ModelTrainer class instead of Estimator
model_trainer = ModelTrainer(
    training_image=image_uri,
    compute=Compute(
        instance_type=training_instance_type,
        instance_count=1,
    ),
    base_job_name=f"{base_job_prefix}-xgboost-train",
    sagemaker_session=pipeline_session,
    role=role,
    hyperparameters={
        "objective": "reg:linear",
        "num_round": 50,
        "max_depth": hyperparameter_max_depth,
        "eta": 0.2,
        "gamma": 4,
        "min_child_weight": 6,
        "subsample": 0.7,
        "silent": 0,
    },
    input_data_config=[
        InputData(
            channel_name="train",
            data_source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        InputData(
            channel_name="validation",
            data_source=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        ),
    ],
)

train_args = model_trainer.train()

step_train = TrainingStep(
    name="TrainModelTestStep",
    step_args=train_args,
    cache_config=cache_config,
)

Create model step#

# Create Model using ModelBuilder
model_builder = ModelBuilder(
    s3_model_data_url=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=image_uri,
    sagemaker_session=pipeline_session,
    role_arn=role,
)

step_create_model = ModelStep(
    name="CreateModel",
    step_args=model_builder.build()
)

Register Step#

step_register_model = ModelStep(
    name="RegisterModel",
    step_args=model_builder.register(
        model_package_group_name="my-model-package-group",
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.xlarge"],
        approval_status="Approved"
    )
)
pipeline = Pipeline(
    name="pipeline-v3",
    parameters=[
        processing_instance_count,
        training_instance_type,
        input_data,
        model_approval_status,
        hyperparameter_max_depth
    ],
    steps=[step_process, step_train, step_create_model, step_register_model],
    sagemaker_session=pipeline_session,
)
pipeline.definition()
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()