SageMaker V3 Inference Pipeline - ModelBuilder vs Core#

This notebook demonstrates how to create and deploy an inference pipeline in SageMaker V3. An inference pipeline chains multiple containers together, where the output of one container becomes the input to the next.

Prerequisites#

Note: Ensure you have sagemaker and ipywidgets installed in your environment. The ipywidgets package is required to monitor endpoint deployment progress in Jupyter notebooks.

What You’ll Learn#

  1. Train models using ModelTrainer (high-level training API)

  2. Package inference code with model artifacts using repack_model

  3. Create multi-container pipeline models with Model.create()

  4. Deploy pipelines using both low-level APIs and ModelBuilder

Pipeline Architecture#

Raw Data → [SKLearn: StandardScaler] → Scaled Data → [XGBoost: Classifier] → Predictions
  • Container 1 (Preprocessing): SKLearn StandardScaler normalizes input features

  • Container 2 (Inference): XGBoost binary classifier predicts outcomes

Why Use Inference Pipelines?#

  • Separation of concerns: Preprocessing and inference logic in separate containers

  • Reusability: Same preprocessing can be used with different models

  • Scalability: Each container can be optimized independently

  • Maintainability: Update one component without affecting others


Step 1: Setup and Data Preparation#

We start by importing the required modules and creating synthetic data for our binary classification task. The data has features at different scales to demonstrate the value of preprocessing.

import uuid
import os
import tempfile
import numpy as np
import pandas as pd
import boto3

from sagemaker.core.resources import Model, Endpoint, EndpointConfig
from sagemaker.core.shapes import ContainerDefinition, InferenceExecutionConfig, ProductionVariant
from sagemaker.core.image_uris import retrieve
from sagemaker.core.utils import repack_model
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.train.model_trainer import ModelTrainer
from sagemaker.train.configs import SourceCode, InputData
from sagemaker.serve import ModelBuilder
# Initialize session
sagemaker_session = Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
bucket = sagemaker_session.default_bucket()
unique_id = str(uuid.uuid4())[:8]
prefix = f"inference-pipeline-v3/{unique_id}"

print(f"Region: {region}")
print(f"Bucket: {bucket}")
print(f"Prefix: {prefix}")
# Generate synthetic data
np.random.seed(42)
n_samples = 1000

feature1 = np.random.normal(100, 15, n_samples)
feature2 = np.random.normal(50, 10, n_samples)
feature3 = np.random.normal(0.5, 0.1, n_samples)
feature4 = np.random.normal(1000, 200, n_samples)
target = ((feature1 > 100) & (feature2 > 50) | (feature4 > 1100)).astype(int)

df = pd.DataFrame({
    'feature1': feature1, 'feature2': feature2,
    'feature3': feature3, 'feature4': feature4, 'target': target
})

train_df = df[:800]
test_df = df[800:]

# Upload training data
data_dir = tempfile.mkdtemp()
train_file = os.path.join(data_dir, 'train.csv')
train_df.to_csv(train_file, index=False, header=False)

s3_client = boto3.client('s3')
train_s3_key = f"{prefix}/data/train.csv"
s3_client.upload_file(train_file, bucket, train_s3_key)
train_data_uri = f"s3://{bucket}/{train_s3_key}"
print(f"Training data: {train_data_uri}")

Step 2: Train SKLearn Model with ModelTrainer#

ModelTrainer is the V3 high-level API for training. It simplifies job creation compared to the low-level TrainingJob.create() API.

Key components:

  • SourceCode: Points to your training script and source directory

  • InputData: Defines training data channels

  • The training script only needs training logic - inference code is added separately later

# Create SKLearn training script (training only - no inference functions)
sklearn_source_dir = tempfile.mkdtemp()

sklearn_train_script = '''import argparse, os, joblib
import pandas as pd
from sklearn.preprocessing import StandardScaler

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
    args = parser.parse_args()
    
    train_files = [os.path.join(args.train, f) for f in os.listdir(args.train) if f.endswith(".csv")]
    df = pd.concat([pd.read_csv(f, header=None) for f in train_files])
    X = df.iloc[:, :4].values
    
    scaler = StandardScaler()
    scaler.fit(X)
    
    os.makedirs(args.model_dir, exist_ok=True)
    joblib.dump(scaler, os.path.join(args.model_dir, "model.joblib"))
    print(f"Model saved to {args.model_dir}")
'''

with open(os.path.join(sklearn_source_dir, 'train.py'), 'w') as f:
    f.write(sklearn_train_script)

print(f"SKLearn training script: {sklearn_source_dir}")
# Get SKLearn training image
sklearn_training_image = retrieve(
    framework="sklearn", region=region, version="1.4-2", py_version="py3"
)
print(f"SKLearn training image: {sklearn_training_image}")
# Train SKLearn model using ModelTrainer
sklearn_trainer = ModelTrainer(
    training_image=sklearn_training_image,
    source_code=SourceCode(
        source_dir=sklearn_source_dir,
        entry_script="train.py"
    ),
    base_job_name="sklearn-preprocess",
    role=role,
    sagemaker_session=sagemaker_session
)

sklearn_trainer.train(
    input_data_config=[InputData(channel_name="train", data_source=train_data_uri)]
)

sklearn_model_uri = sklearn_trainer._latest_training_job.model_artifacts.s3_model_artifacts
print(f"SKLearn model artifacts: {sklearn_model_uri}")

Step 3: Train XGBoost Model with ModelTrainer#

We train an XGBoost classifier using the same ModelTrainer pattern. Note that we pass hyperparameters directly to the trainer.

# Create XGBoost training script
xgboost_source_dir = tempfile.mkdtemp()

xgboost_train_script = '''import argparse, os
import pandas as pd
import xgboost as xgb

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
    parser.add_argument("--num-round", type=int, default=100)
    parser.add_argument("--max-depth", type=int, default=5)
    parser.add_argument("--eta", type=float, default=0.2)
    args = parser.parse_args()
    
    train_files = [os.path.join(args.train, f) for f in os.listdir(args.train) if f.endswith(".csv")]
    df = pd.concat([pd.read_csv(f, header=None) for f in train_files])
    X, y = df.iloc[:, :4].values, df.iloc[:, 4].values
    
    dtrain = xgb.DMatrix(X, label=y)
    params = {"max_depth": args.max_depth, "eta": args.eta, "objective": "binary:logistic"}
    model = xgb.train(params, dtrain, num_boost_round=args.num_round)
    
    os.makedirs(args.model_dir, exist_ok=True)
    model.save_model(os.path.join(args.model_dir, "xgboost-model"))
    print(f"Model saved to {args.model_dir}")
'''

with open(os.path.join(xgboost_source_dir, 'train.py'), 'w') as f:
    f.write(xgboost_train_script)

print(f"XGBoost training script: {xgboost_source_dir}")
# Get XGBoost training image
xgboost_training_image = retrieve(
    framework="xgboost", region=region, version="3.0-5",
)
print(f"XGBoost training image: {xgboost_training_image}")
# Train XGBoost model using ModelTrainer
xgboost_trainer = ModelTrainer(
    training_image=xgboost_training_image,
    source_code=SourceCode(
        source_dir=xgboost_source_dir,
        entry_script="train.py"
    ),
    hyperparameters={
        "num-round": 100,
        "max-depth": 5,
        "eta": 0.2
    },
    base_job_name="xgboost-classifier",
    role=role,
    sagemaker_session=sagemaker_session
)

xgboost_trainer.train(
    input_data_config=[InputData(channel_name="train", data_source=train_data_uri)]
)

xgboost_model_uri = xgboost_trainer._latest_training_job.model_artifacts.s3_model_artifacts
print(f"XGBoost model artifacts: {xgboost_model_uri}")

Step 4: Create Inference Scripts and Repack Models#

Training produces model artifacts (e.g., model.tar.gz) but these don’t include inference code. The repack_model utility:

  1. Downloads the original model artifacts from S3

  2. Extracts them to a temporary directory

  3. Adds your inference script to a code/ subdirectory

  4. Re-packages and uploads to S3

Important for pipelines: The output_fn must return a tuple (data, content_type) to explicitly set the content type passed to the next container. Without this, intermediate containers receive application/json as the default accept type.

# Create SKLearn inference script
sklearn_inference_dir = tempfile.mkdtemp()

sklearn_inference_script = '''import joblib, os
import numpy as np

def model_fn(model_dir):
    return joblib.load(os.path.join(model_dir, "model.joblib"))

def input_fn(request_body, request_content_type):
    if request_content_type == "text/csv":
        return np.array([[float(x) for x in line.split(",")] for line in request_body.strip().split("\\n")])
    raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model):
    return model.transform(input_data)

def output_fn(prediction, accept):
    # Always return CSV with explicit content-type for pipeline compatibility
    csv_output = "\\n".join([",".join([str(x) for x in row]) for row in prediction])
    return csv_output, "text/csv"
'''

with open(os.path.join(sklearn_inference_dir, 'inference.py'), 'w') as f:
    f.write(sklearn_inference_script)

print(f"SKLearn inference script: {sklearn_inference_dir}")
# Repack SKLearn model with inference code using repack_model utility
sklearn_repacked_uri = f"s3://{bucket}/{prefix}/sklearn/repacked/model.tar.gz"

repack_model(
    inference_script="inference.py",
    source_directory=sklearn_inference_dir,
    dependencies=[],
    model_uri=sklearn_model_uri,
    repacked_model_uri=sklearn_repacked_uri,
    sagemaker_session=sagemaker_session
)

print(f"Repacked SKLearn model: {sklearn_repacked_uri}")
# XGBoost uses built-in inference - no custom script needed for basic CSV input/output
# The XGBoost container handles text/csv natively
xgboost_repacked_uri = xgboost_model_uri
print(f"XGBoost model (no repack needed): {xgboost_repacked_uri}")


Alternative: Low-level API Deployment#

This section demonstrates the low-level approach using Model.create() with multiple ContainerDefinition objects. Use this when you need fine-grained control over the deployment configuration.

Key parameters:

  • containers: List of ContainerDefinition objects executed in order

  • container_hostname: Identifies each container in logs and metrics

  • inference_execution_config: Set to Serial for pipeline execution

  • environment: Must include SAGEMAKER_PROGRAM and SAGEMAKER_SUBMIT_DIRECTORY for custom inference scripts

# Create inference pipeline model
pipeline_model_name = f"pipeline-model-{unique_id}"

pipeline_model = Model.create(
    model_name=pipeline_model_name,
    containers=[
        ContainerDefinition(
            container_hostname="preprocessing",
            image=sklearn_inference_image,
            model_data_url=sklearn_repacked_uri,
            environment={
                "SAGEMAKER_PROGRAM": "inference.py",
                "SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code"
            }
        ),
        ContainerDefinition(
            container_hostname="inference",
            image=xgboost_inference_image,
            model_data_url=xgboost_repacked_uri
        )
    ],
    inference_execution_config=InferenceExecutionConfig(mode="Serial"),
    execution_role_arn=role
)

print(f"Pipeline model created: {pipeline_model.model_name}")

Deploy the Inference Pipeline#

Deployment requires creating an EndpointConfig and then an Endpoint. This is the low-level approach that gives you full control over the deployment configuration.

# Create endpoint config and endpoint
endpoint_config_name = f"pipeline-config-{unique_id}"
endpoint_name = f"pipeline-endpoint-{unique_id}"

endpoint_config = EndpointConfig.create(
    endpoint_config_name=endpoint_config_name,
    production_variants=[
        ProductionVariant(
            variant_name="AllTraffic",
            model_name=pipeline_model_name,
            initial_instance_count=1,
            instance_type="ml.m5.large"
        )
    ]
)

endpoint = Endpoint.create(
    endpoint_name=endpoint_name,
    endpoint_config_name=endpoint_config_name
)

print(f"Creating endpoint: {endpoint_name}")
endpoint.wait_for_status(target_status="InService")
print(f"Endpoint ready: {endpoint_name}")

Test the Inference Pipeline#

When invoking the pipeline:

  • Your input goes to Container 1 (SKLearn preprocessing)

  • Container 1’s output automatically flows to Container 2 (XGBoost)

  • Container 2’s output is returned as the final response

The content_type you specify applies to Container 1’s input, and accept applies to Container 2’s output.

# Test inference
test_samples = test_df.iloc[:5, :4].values
test_labels = test_df.iloc[:5, 4].values

csv_data = "\n".join([",".join([str(x) for x in row]) for row in test_samples])

response = endpoint.invoke(
    body=csv_data,
    content_type="text/csv",
    accept="text/csv"
)

result = response.body.read().decode('utf-8')
predictions = [float(x) for x in result.strip().split('\n')]

print("Pipeline Inference Results:")
print(f"Predictions (probabilities): {predictions}")
print(f"Binary predictions: {[1 if p > 0.5 else 0 for p in predictions]}")
print(f"Actual labels: {list(test_labels)}")

Clean Up#

Delete resources in reverse order of creation: Endpoint → EndpointConfig → Model.

# Clean up resources
print("Cleaning up...")

try:
    endpoint.delete()
    print(f"Deleted endpoint: {endpoint_name}")
except Exception as e:
    print(f"Error: {e}")

try:
    endpoint_config.delete()
    print(f"Deleted endpoint config: {endpoint_config_name}")
except Exception as e:
    print(f"Error: {e}")

try:
    pipeline_model.delete()
    print(f"Deleted model: {pipeline_model_name}")
except Exception as e:
    print(f"Error: {e}")

print("Cleanup completed!")

Summary#

This notebook demonstrated two approaches for deploying inference pipelines in SageMaker V3.

Approach 2: Low-level APIs (Full Control)#

Step

API

Description

Training

ModelTrainer

Same as above

Repacking

repack_model()

Same as above

Model

Model.create(containers=[...])

Creates multi-container pipeline model

Deploy

EndpointConfig + Endpoint

Explicit endpoint configuration

Key Concepts#

Training vs Inference Code Separation:

  • Training scripts focus on model fitting

  • Inference logic added via repack_model

Pipeline Data Flow:

  • content_type in invoke() → applies to first container’s input

  • accept in invoke() → applies to last container’s output

  • Intermediate data: controlled by output_fn return value

When to Use Each Approach#

  • ModelBuilder: Quick deployment, recommended for most use cases

  • Low-level APIs: Fine-grained control over endpoint configuration