# V3 Imports
from sagemaker.train import ModelTrainer
from sagemaker.train.configs import Compute, SourceCode, InputData, StoppingCondition
from sagemaker.train.tuner import HyperparameterTuner
from sagemaker.core.parameter import ContinuousParameter, CategoricalParameter
from sagemaker.core.helper.session_helper import get_execution_role
from sagemaker.mlops.workflow.steps import TuningStep
from sagemaker.mlops.workflow.model_step import ModelStep
from sagemaker.serve.model_builder import ModelBuilder
from sagemaker.core.workflow.pipeline_context import PipelineSession
# Initialize SageMaker session
pipeline_session = PipelineSession()
region = pipeline_session.boto_region_name
default_bucket = pipeline_session.default_bucket()
# Role Configuration
# Option 1: Auto-detect (works in SageMaker Studio/Notebook instances)
# Option 2: Manually specify your SageMaker execution role ARN
try:
role = get_execution_role()
print(f"✓ Auto-detected role: {role}")
except Exception as e:
print(f"⚠️ Could not auto-detect role: {e}")
# Manually specify your SageMaker execution role ARN here:
role = "<IAM Role ARN>"
print(f"✓ Using manually specified role: {role}")
# Define prefixes for organization
prefix = "v3-tuning"
base_job_prefix = "pytorch-mnist-hpo"
# Configuration
training_instance_type = "ml.m5.xlarge"
account_id = pipeline_session.account_id()
local_dir = "data"
print(f"\nRegion: {region}")
print(f"Role: {role}")
print(f"Bucket: {default_bucket}")
print(f"Prefix: {prefix}")
Download Data#
# Download MNIST dataset
from torchvision.datasets import MNIST
from torchvision import transforms
MNIST.mirrors = [
f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/"
]
print("Downloading MNIST dataset...")
MNIST(
local_dir,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]),
)
# Upload to S3
s3_data_uri = pipeline_session.upload_data(
path=local_dir,
bucket=default_bucket,
key_prefix=f"{prefix}/data"
)
print(f"Training data uploaded to: {s3_data_uri}")
Tune Hyperparameters#
# Configure source code
source_code = SourceCode(
source_dir=".", # Current directory containing mnist.py
entry_script="mnist.py"
)
# Configure compute resources
compute = Compute(
instance_type=training_instance_type,
instance_count=1,
volume_size_in_gb=30
)
# Configure stopping condition
stopping_condition = StoppingCondition(
max_runtime_in_seconds=3600 # 1 hour
)
# Get PyTorch training image
training_image = f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training:1.10.0-gpu-py38"
# Create ModelTrainer
model_trainer = ModelTrainer(
training_image=training_image,
source_code=source_code,
compute=compute,
stopping_condition=stopping_condition,
hyperparameters={
"epochs": 1, # Use 1 epoch for faster tuning
"backend": "gloo"
},
sagemaker_session=pipeline_session,
role=role,
base_job_name=base_job_prefix
)
print("ModelTrainer configured successfully")
print(f"Training Image: {training_image}")
print(f"Instance Type: {training_instance_type}")
# Define hyperparameter ranges to tune
hyperparameter_ranges = {
"lr": ContinuousParameter(0.001, 0.1),
"batch-size": CategoricalParameter([32, 64, 128, 256, 512]),
}
# Define objective metric
objective_metric_name = "average test loss"
objective_type = "Minimize"
# Define metric definitions
metric_definitions = [
{
"Name": "average test loss",
"Regex": "Test set: Average loss: ([0-9\\.]+)"
}
]
# Create HyperparameterTuner
tuner = HyperparameterTuner(
model_trainer=model_trainer,
objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges,
metric_definitions=metric_definitions,
max_jobs=3,
max_parallel_jobs=2,
strategy="Random",
objective_type=objective_type,
early_stopping_type="Auto"
)
print("HyperparameterTuner configured successfully")
# Prepare input data
training_data = InputData(
channel_name="training",
data_source=s3_data_uri
)
# Start tuning job
print("Starting hyperparameter tuning job...")
tuner_run_args = tuner.tune(
inputs=[training_data],
wait=False
)
step_tuning = TuningStep(
name="HPTuning",
step_args=tuner_run_args,
)
tuning_job_name = tuner._current_job_name
print(f"\nTuning job started: {tuning_job_name}")
Deploy best tuned model#
model_builder = ModelBuilder(
image_uri=training_image,
s3_model_data_url=step_tuning.get_top_model_s3_uri(
top_k=0, s3_bucket=default_bucket, prefix=base_job_prefix
),
sagemaker_session=pipeline_session,
role_arn=role,
)
step_create_best = ModelStep(
name="CreateBestModel",
step_args=model_builder.build(),
)
from sagemaker.mlops.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="pipeline-v3",
steps=[step_tuning, step_create_best],
sagemaker_session=pipeline_session,
)
# This step is slow because source directory will be uploaded it to S3.
pipeline.definition()
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.describe()['PipelineExecutionStatus']