# V3 Imports
from sagemaker.train import ModelTrainer
from sagemaker.train.configs import Compute, SourceCode, InputData, StoppingCondition
from sagemaker.train.tuner import HyperparameterTuner
from sagemaker.core.parameter import ContinuousParameter, CategoricalParameter
from sagemaker.core.helper.session_helper import get_execution_role
from sagemaker.mlops.workflow.steps import TuningStep
from sagemaker.mlops.workflow.model_step import ModelStep
from sagemaker.serve.model_builder import ModelBuilder
from sagemaker.core.workflow.pipeline_context import PipelineSession
# Initialize SageMaker session
pipeline_session = PipelineSession()
region = pipeline_session.boto_region_name
default_bucket = pipeline_session.default_bucket()

# Role Configuration
# Option 1: Auto-detect (works in SageMaker Studio/Notebook instances)
# Option 2: Manually specify your SageMaker execution role ARN
try:
    role = get_execution_role()
    print(f"✓ Auto-detected role: {role}")
except Exception as e:
    print(f"⚠️  Could not auto-detect role: {e}")
    # Manually specify your SageMaker execution role ARN here:
    role = "<IAM Role ARN>"
    print(f"✓ Using manually specified role: {role}")

# Define prefixes for organization
prefix = "v3-tuning"
base_job_prefix = "pytorch-mnist-hpo"

# Configuration
training_instance_type = "ml.m5.xlarge"
account_id = pipeline_session.account_id()
local_dir = "data"

print(f"\nRegion: {region}")
print(f"Role: {role}")
print(f"Bucket: {default_bucket}")
print(f"Prefix: {prefix}")

Download Data#

# Download MNIST dataset
from torchvision.datasets import MNIST
from torchvision import transforms

MNIST.mirrors = [
    f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/"
]

print("Downloading MNIST dataset...")
MNIST(
    local_dir,
    download=True,
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]),
)
# Upload to S3
s3_data_uri = pipeline_session.upload_data(
    path=local_dir,
    bucket=default_bucket,
    key_prefix=f"{prefix}/data"
)

print(f"Training data uploaded to: {s3_data_uri}")

Tune Hyperparameters#

# Configure source code
source_code = SourceCode(
    source_dir=".",  # Current directory containing mnist.py
    entry_script="mnist.py"
)

# Configure compute resources
compute = Compute(
    instance_type=training_instance_type,
    instance_count=1,
    volume_size_in_gb=30
)

# Configure stopping condition
stopping_condition = StoppingCondition(
    max_runtime_in_seconds=3600  # 1 hour
)

# Get PyTorch training image
training_image = f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training:1.10.0-gpu-py38"

# Create ModelTrainer
model_trainer = ModelTrainer(
    training_image=training_image,
    source_code=source_code,
    compute=compute,
    stopping_condition=stopping_condition,
    hyperparameters={
        "epochs": 1,  # Use 1 epoch for faster tuning
        "backend": "gloo"
    },
    sagemaker_session=pipeline_session,
    role=role,
    base_job_name=base_job_prefix
)

print("ModelTrainer configured successfully")
print(f"Training Image: {training_image}")
print(f"Instance Type: {training_instance_type}")
# Define hyperparameter ranges to tune
hyperparameter_ranges = {
    "lr": ContinuousParameter(0.001, 0.1),
    "batch-size": CategoricalParameter([32, 64, 128, 256, 512]),
}

# Define objective metric
objective_metric_name = "average test loss"
objective_type = "Minimize"

# Define metric definitions
metric_definitions = [
    {
        "Name": "average test loss",
        "Regex": "Test set: Average loss: ([0-9\\.]+)"
    }
]

# Create HyperparameterTuner
tuner = HyperparameterTuner(
    model_trainer=model_trainer,
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    max_jobs=3,
    max_parallel_jobs=2,
    strategy="Random",
    objective_type=objective_type,
    early_stopping_type="Auto"
)

print("HyperparameterTuner configured successfully")
# Prepare input data
training_data = InputData(
    channel_name="training",
    data_source=s3_data_uri
)

# Start tuning job
print("Starting hyperparameter tuning job...")
tuner_run_args = tuner.tune(
    inputs=[training_data],
    wait=False
)

step_tuning = TuningStep(
    name="HPTuning",
    step_args=tuner_run_args,
)


tuning_job_name = tuner._current_job_name
print(f"\nTuning job started: {tuning_job_name}")

Deploy best tuned model#

model_builder = ModelBuilder(
    image_uri=training_image,
    s3_model_data_url=step_tuning.get_top_model_s3_uri(
        top_k=0, s3_bucket=default_bucket, prefix=base_job_prefix
    ),
    sagemaker_session=pipeline_session,
    role_arn=role,
)

step_create_best = ModelStep(
    name="CreateBestModel",
    step_args=model_builder.build(),
)
from sagemaker.mlops.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="pipeline-v3",
    steps=[step_tuning, step_create_best],
    sagemaker_session=pipeline_session,
)
# This step is slow because source directory will be uploaded it to S3.
pipeline.definition()
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.describe()['PipelineExecutionStatus']