SageMaker V3 Inference Pipeline - ModelBuilder vs Core#
This notebook demonstrates how to create and deploy an inference pipeline in SageMaker V3. An inference pipeline chains multiple containers together, where the output of one container becomes the input to the next.
Prerequisites#
Note: Ensure you have sagemaker and ipywidgets installed in your environment. The ipywidgets package is required to monitor endpoint deployment progress in Jupyter notebooks.
What You’ll Learn#
Train models using
ModelTrainer(high-level training API)Package inference code with model artifacts using
repack_modelCreate multi-container pipeline models with
Model.create()Deploy pipelines using both low-level APIs and
ModelBuilder
Pipeline Architecture#
Raw Data → [SKLearn: StandardScaler] → Scaled Data → [XGBoost: Classifier] → Predictions
Container 1 (Preprocessing): SKLearn StandardScaler normalizes input features
Container 2 (Inference): XGBoost binary classifier predicts outcomes
Why Use Inference Pipelines?#
Separation of concerns: Preprocessing and inference logic in separate containers
Reusability: Same preprocessing can be used with different models
Scalability: Each container can be optimized independently
Maintainability: Update one component without affecting others
Step 1: Setup and Data Preparation#
We start by importing the required modules and creating synthetic data for our binary classification task. The data has features at different scales to demonstrate the value of preprocessing.
import uuid
import os
import tempfile
import numpy as np
import pandas as pd
import boto3
from sagemaker.core.resources import Model, Endpoint, EndpointConfig
from sagemaker.core.shapes import ContainerDefinition, InferenceExecutionConfig, ProductionVariant
from sagemaker.core.image_uris import retrieve
from sagemaker.core.utils import repack_model
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.train.model_trainer import ModelTrainer
from sagemaker.train.configs import SourceCode, InputData
from sagemaker.serve import ModelBuilder
# Initialize session
sagemaker_session = Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
bucket = sagemaker_session.default_bucket()
unique_id = str(uuid.uuid4())[:8]
prefix = f"inference-pipeline-v3/{unique_id}"
print(f"Region: {region}")
print(f"Bucket: {bucket}")
print(f"Prefix: {prefix}")
# Generate synthetic data
np.random.seed(42)
n_samples = 1000
feature1 = np.random.normal(100, 15, n_samples)
feature2 = np.random.normal(50, 10, n_samples)
feature3 = np.random.normal(0.5, 0.1, n_samples)
feature4 = np.random.normal(1000, 200, n_samples)
target = ((feature1 > 100) & (feature2 > 50) | (feature4 > 1100)).astype(int)
df = pd.DataFrame({
'feature1': feature1, 'feature2': feature2,
'feature3': feature3, 'feature4': feature4, 'target': target
})
train_df = df[:800]
test_df = df[800:]
# Upload training data
data_dir = tempfile.mkdtemp()
train_file = os.path.join(data_dir, 'train.csv')
train_df.to_csv(train_file, index=False, header=False)
s3_client = boto3.client('s3')
train_s3_key = f"{prefix}/data/train.csv"
s3_client.upload_file(train_file, bucket, train_s3_key)
train_data_uri = f"s3://{bucket}/{train_s3_key}"
print(f"Training data: {train_data_uri}")
Step 2: Train SKLearn Model with ModelTrainer#
ModelTrainer is the V3 high-level API for training. It simplifies job creation compared to the low-level TrainingJob.create() API.
Key components:
SourceCode: Points to your training script and source directoryInputData: Defines training data channelsThe training script only needs training logic - inference code is added separately later
# Create SKLearn training script (training only - no inference functions)
sklearn_source_dir = tempfile.mkdtemp()
sklearn_train_script = '''import argparse, os, joblib
import pandas as pd
from sklearn.preprocessing import StandardScaler
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
args = parser.parse_args()
train_files = [os.path.join(args.train, f) for f in os.listdir(args.train) if f.endswith(".csv")]
df = pd.concat([pd.read_csv(f, header=None) for f in train_files])
X = df.iloc[:, :4].values
scaler = StandardScaler()
scaler.fit(X)
os.makedirs(args.model_dir, exist_ok=True)
joblib.dump(scaler, os.path.join(args.model_dir, "model.joblib"))
print(f"Model saved to {args.model_dir}")
'''
with open(os.path.join(sklearn_source_dir, 'train.py'), 'w') as f:
f.write(sklearn_train_script)
print(f"SKLearn training script: {sklearn_source_dir}")
# Get SKLearn training image
sklearn_training_image = retrieve(
framework="sklearn", region=region, version="1.4-2", py_version="py3"
)
print(f"SKLearn training image: {sklearn_training_image}")
# Train SKLearn model using ModelTrainer
sklearn_trainer = ModelTrainer(
training_image=sklearn_training_image,
source_code=SourceCode(
source_dir=sklearn_source_dir,
entry_script="train.py"
),
base_job_name="sklearn-preprocess",
role=role,
sagemaker_session=sagemaker_session
)
sklearn_trainer.train(
input_data_config=[InputData(channel_name="train", data_source=train_data_uri)]
)
sklearn_model_uri = sklearn_trainer._latest_training_job.model_artifacts.s3_model_artifacts
print(f"SKLearn model artifacts: {sklearn_model_uri}")
Step 3: Train XGBoost Model with ModelTrainer#
We train an XGBoost classifier using the same ModelTrainer pattern. Note that we pass hyperparameters directly to the trainer.
# Create XGBoost training script
xgboost_source_dir = tempfile.mkdtemp()
xgboost_train_script = '''import argparse, os
import pandas as pd
import xgboost as xgb
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
parser.add_argument("--num-round", type=int, default=100)
parser.add_argument("--max-depth", type=int, default=5)
parser.add_argument("--eta", type=float, default=0.2)
args = parser.parse_args()
train_files = [os.path.join(args.train, f) for f in os.listdir(args.train) if f.endswith(".csv")]
df = pd.concat([pd.read_csv(f, header=None) for f in train_files])
X, y = df.iloc[:, :4].values, df.iloc[:, 4].values
dtrain = xgb.DMatrix(X, label=y)
params = {"max_depth": args.max_depth, "eta": args.eta, "objective": "binary:logistic"}
model = xgb.train(params, dtrain, num_boost_round=args.num_round)
os.makedirs(args.model_dir, exist_ok=True)
model.save_model(os.path.join(args.model_dir, "xgboost-model"))
print(f"Model saved to {args.model_dir}")
'''
with open(os.path.join(xgboost_source_dir, 'train.py'), 'w') as f:
f.write(xgboost_train_script)
print(f"XGBoost training script: {xgboost_source_dir}")
# Get XGBoost training image
xgboost_training_image = retrieve(
framework="xgboost", region=region, version="3.0-5",
)
print(f"XGBoost training image: {xgboost_training_image}")
# Train XGBoost model using ModelTrainer
xgboost_trainer = ModelTrainer(
training_image=xgboost_training_image,
source_code=SourceCode(
source_dir=xgboost_source_dir,
entry_script="train.py"
),
hyperparameters={
"num-round": 100,
"max-depth": 5,
"eta": 0.2
},
base_job_name="xgboost-classifier",
role=role,
sagemaker_session=sagemaker_session
)
xgboost_trainer.train(
input_data_config=[InputData(channel_name="train", data_source=train_data_uri)]
)
xgboost_model_uri = xgboost_trainer._latest_training_job.model_artifacts.s3_model_artifacts
print(f"XGBoost model artifacts: {xgboost_model_uri}")
Step 4: Create Inference Scripts and Repack Models#
Training produces model artifacts (e.g., model.tar.gz) but these don’t include inference code. The repack_model utility:
Downloads the original model artifacts from S3
Extracts them to a temporary directory
Adds your inference script to a
code/subdirectoryRe-packages and uploads to S3
Important for pipelines: The output_fn must return a tuple (data, content_type) to explicitly set the content type passed to the next container. Without this, intermediate containers receive application/json as the default accept type.
# Create SKLearn inference script
sklearn_inference_dir = tempfile.mkdtemp()
sklearn_inference_script = '''import joblib, os
import numpy as np
def model_fn(model_dir):
return joblib.load(os.path.join(model_dir, "model.joblib"))
def input_fn(request_body, request_content_type):
if request_content_type == "text/csv":
return np.array([[float(x) for x in line.split(",")] for line in request_body.strip().split("\\n")])
raise ValueError(f"Unsupported content type: {request_content_type}")
def predict_fn(input_data, model):
return model.transform(input_data)
def output_fn(prediction, accept):
# Always return CSV with explicit content-type for pipeline compatibility
csv_output = "\\n".join([",".join([str(x) for x in row]) for row in prediction])
return csv_output, "text/csv"
'''
with open(os.path.join(sklearn_inference_dir, 'inference.py'), 'w') as f:
f.write(sklearn_inference_script)
print(f"SKLearn inference script: {sklearn_inference_dir}")
# Repack SKLearn model with inference code using repack_model utility
sklearn_repacked_uri = f"s3://{bucket}/{prefix}/sklearn/repacked/model.tar.gz"
repack_model(
inference_script="inference.py",
source_directory=sklearn_inference_dir,
dependencies=[],
model_uri=sklearn_model_uri,
repacked_model_uri=sklearn_repacked_uri,
sagemaker_session=sagemaker_session
)
print(f"Repacked SKLearn model: {sklearn_repacked_uri}")
# XGBoost uses built-in inference - no custom script needed for basic CSV input/output
# The XGBoost container handles text/csv natively
xgboost_repacked_uri = xgboost_model_uri
print(f"XGBoost model (no repack needed): {xgboost_repacked_uri}")
Step 5: Deploy with ModelBuilder (Recommended)#
ModelBuilder provides a simplified deployment experience for inference pipelines. This is the recommended approach for most use cases.
How it works:
Create individual
Modelobjects usingModel.create()withprimary_containerPass the list of models to
ModelBuilder(model=[model1, model2, ...])Call
build()to create the pipeline modelCall
deploy()to create the endpoint
Note: Each Model must use primary_container (not containers). ModelBuilder extracts the container definitions and combines them into a pipeline.
# Get inference images
sklearn_inference_image = retrieve(
framework="sklearn", region=region, version="1.4-2"
)
xgboost_inference_image = retrieve(
framework="xgboost", region=region, version="3.0-5"
)
print(f"SKLearn inference image: {sklearn_inference_image}")
print(f"XGBoost inference image: {xgboost_inference_image}")
# Create individual Model objects for each container
sklearn_model_name = f"sklearn-model-{unique_id}"
xgboost_model_name = f"xgboost-model-{unique_id}"
# SKLearn preprocessing model
sklearn_model = Model.create(
model_name=sklearn_model_name,
primary_container=ContainerDefinition(
image=sklearn_inference_image,
model_data_url=sklearn_repacked_uri,
environment={
"SAGEMAKER_PROGRAM": "inference.py",
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code"
}
),
execution_role_arn=role
)
# XGBoost inference model
xgboost_model = Model.create(
model_name=xgboost_model_name,
primary_container=ContainerDefinition(
image=xgboost_inference_image,
model_data_url=xgboost_repacked_uri
),
execution_role_arn=role
)
print(f"Created sklearn model: {sklearn_model_name}")
print(f"Created xgboost model: {xgboost_model_name}")
# Create ModelBuilder with list of Models for inference pipeline
pipeline_builder = ModelBuilder(
model=[sklearn_model, xgboost_model],
role_arn=role,
sagemaker_session=sagemaker_session
)
# Build the pipeline model
pipeline_model_mb = pipeline_builder.build()
print(f"Pipeline model built: {pipeline_model_mb.model_name}")
# Deploy using ModelBuilder
endpoint_name_mb = f"pipeline-mb-{unique_id}"
endpoint_mb = pipeline_builder.deploy(
endpoint_name=endpoint_name_mb,
instance_type="ml.m5.large",
initial_instance_count=1
)
print(f"Endpoint deployed: {endpoint_name_mb}")
# Test the ModelBuilder-deployed endpoint
test_samples = test_df.iloc[:5, :4].values
test_labels = test_df.iloc[:5, 4].values
csv_data = "\n".join([",".join([str(x) for x in row]) for row in test_samples])
response = endpoint_mb.invoke(
body=csv_data,
content_type="text/csv",
accept="text/csv"
)
result = response.body.read().decode('utf-8')
predictions = [float(x) for x in result.strip().split('\n')]
print("ModelBuilder Pipeline Results:")
print(f"Predictions: {predictions}")
print(f"Binary: {[1 if p > 0.5 else 0 for p in predictions]}")
print(f"Actual: {list(test_labels)}")
# Clean up ModelBuilder resources
try:
endpoint_mb.delete()
print(f"Deleted endpoint: {endpoint_name_mb}")
except Exception as e:
print(f"Error: {e}")
try:
sklearn_model.delete()
xgboost_model.delete()
pipeline_model_mb.delete()
print("Deleted models")
except Exception as e:
print(f"Error: {e}")
Alternative: Low-level API Deployment#
This section demonstrates the low-level approach using Model.create() with multiple ContainerDefinition objects. Use this when you need fine-grained control over the deployment configuration.
Key parameters:
containers: List ofContainerDefinitionobjects executed in ordercontainer_hostname: Identifies each container in logs and metricsinference_execution_config: Set toSerialfor pipeline executionenvironment: Must includeSAGEMAKER_PROGRAMandSAGEMAKER_SUBMIT_DIRECTORYfor custom inference scripts
# Create inference pipeline model
pipeline_model_name = f"pipeline-model-{unique_id}"
pipeline_model = Model.create(
model_name=pipeline_model_name,
containers=[
ContainerDefinition(
container_hostname="preprocessing",
image=sklearn_inference_image,
model_data_url=sklearn_repacked_uri,
environment={
"SAGEMAKER_PROGRAM": "inference.py",
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code"
}
),
ContainerDefinition(
container_hostname="inference",
image=xgboost_inference_image,
model_data_url=xgboost_repacked_uri
)
],
inference_execution_config=InferenceExecutionConfig(mode="Serial"),
execution_role_arn=role
)
print(f"Pipeline model created: {pipeline_model.model_name}")
Deploy the Inference Pipeline#
Deployment requires creating an EndpointConfig and then an Endpoint. This is the low-level approach that gives you full control over the deployment configuration.
# Create endpoint config and endpoint
endpoint_config_name = f"pipeline-config-{unique_id}"
endpoint_name = f"pipeline-endpoint-{unique_id}"
endpoint_config = EndpointConfig.create(
endpoint_config_name=endpoint_config_name,
production_variants=[
ProductionVariant(
variant_name="AllTraffic",
model_name=pipeline_model_name,
initial_instance_count=1,
instance_type="ml.m5.large"
)
]
)
endpoint = Endpoint.create(
endpoint_name=endpoint_name,
endpoint_config_name=endpoint_config_name
)
print(f"Creating endpoint: {endpoint_name}")
endpoint.wait_for_status(target_status="InService")
print(f"Endpoint ready: {endpoint_name}")
Test the Inference Pipeline#
When invoking the pipeline:
Your input goes to Container 1 (SKLearn preprocessing)
Container 1’s output automatically flows to Container 2 (XGBoost)
Container 2’s output is returned as the final response
The content_type you specify applies to Container 1’s input, and accept applies to Container 2’s output.
# Test inference
test_samples = test_df.iloc[:5, :4].values
test_labels = test_df.iloc[:5, 4].values
csv_data = "\n".join([",".join([str(x) for x in row]) for row in test_samples])
response = endpoint.invoke(
body=csv_data,
content_type="text/csv",
accept="text/csv"
)
result = response.body.read().decode('utf-8')
predictions = [float(x) for x in result.strip().split('\n')]
print("Pipeline Inference Results:")
print(f"Predictions (probabilities): {predictions}")
print(f"Binary predictions: {[1 if p > 0.5 else 0 for p in predictions]}")
print(f"Actual labels: {list(test_labels)}")
Clean Up#
Delete resources in reverse order of creation: Endpoint → EndpointConfig → Model.
# Clean up resources
print("Cleaning up...")
try:
endpoint.delete()
print(f"Deleted endpoint: {endpoint_name}")
except Exception as e:
print(f"Error: {e}")
try:
endpoint_config.delete()
print(f"Deleted endpoint config: {endpoint_config_name}")
except Exception as e:
print(f"Error: {e}")
try:
pipeline_model.delete()
print(f"Deleted model: {pipeline_model_name}")
except Exception as e:
print(f"Error: {e}")
print("Cleanup completed!")
Summary#
This notebook demonstrated two approaches for deploying inference pipelines in SageMaker V3.
Approach 1: ModelBuilder (Recommended)#
Step |
API |
Description |
|---|---|---|
Training |
|
High-level training with |
Repacking |
|
Adds inference code to model artifacts |
Models |
|
Individual models per container |
Deploy |
|
Single call for build + deploy |
Approach 2: Low-level APIs (Full Control)#
Step |
API |
Description |
|---|---|---|
Training |
|
Same as above |
Repacking |
|
Same as above |
Model |
|
Creates multi-container pipeline model |
Deploy |
|
Explicit endpoint configuration |
Key Concepts#
Training vs Inference Code Separation:
Training scripts focus on model fitting
Inference logic added via
repack_model
Pipeline Data Flow:
content_typeininvoke()→ applies to first container’s inputacceptininvoke()→ applies to last container’s outputIntermediate data: controlled by
output_fnreturn value
When to Use Each Approach#
ModelBuilder: Quick deployment, recommended for most use cases
Low-level APIs: Fine-grained control over endpoint configuration