SageMaker V3 Local Training Example#

This notebook demonstrates how to use SageMaker V3 ModelTrainer in Local Container mode for testing training jobs in Docker containers locally. Note: This notebook will not run in SageMaker Studio.

import os
import subprocess
import tempfile
import shutil
import numpy as np

from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.core.helper.session_helper import Session
# NOTE: Local mode requires Docker to be installed and running.
import os
import sys
if sys.platform == 'darwin':
    os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:/Applications/Docker.app/Contents/Resources/cli-plugins:' + os.environ['PATH']

Step 1: Setup Session and Create Test Data#

Initialize the SageMaker session and create the necessary test data and training script.

sagemaker_session = Session()
region = sagemaker_session.boto_region_name

# Get the correct ECR image for your region
from sagemaker.core import image_uris
DEFAULT_CPU_IMAGE = image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="2.0.0",
    py_version="py310",
    instance_type="ml.m5.xlarge",
    image_scope="training"
)

# Set Docker platform for Apple Silicon compatibility
import platform
if platform.machine() == 'arm64':
    os.environ['DOCKER_DEFAULT_PLATFORM'] = 'linux/amd64'

# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

print(f"Created temporary directories in: {temp_dir}")

Step 2: Create Training Data and Scripts#

Generate the test data and training scripts needed for local container training.

# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)

np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)

print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn

def get_model():
    return nn.Sequential(
        nn.Linear(4, 10),
        nn.ReLU(),
        nn.Linear(10, 1)
    )
'''

with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
    f.write(pytorch_model_def)

print("Created pytorch_model_def.py")
# Create training script
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"

def get_train_data(train_dir):
    x_train = np.load(os.path.join(train_dir, "x_train.npy"))
    y_train = np.load(os.path.join(train_dir, "y_train.npy"))
    logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
    return torch.from_numpy(x_train), torch.from_numpy(y_train)

def get_test_data(test_dir):
    x_test = np.load(os.path.join(test_dir, "x_test.npy"))
    y_test = np.load(os.path.join(test_dir, "y_test.npy"))
    logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
    return torch.from_numpy(x_test), torch.from_numpy(y_test)

def train():
    train_dir = os.path.join(data_dir, "train")
    test_dir = os.path.join(data_dir, "test")
    model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))

    x_train, y_train = get_train_data(train_dir)
    x_test, y_test = get_test_data(test_dir)
    train_ds = TensorDataset(x_train, y_train)

    batch_size = 64
    epochs = 1
    learning_rate = 0.1
    logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")

    train_dl = DataLoader(train_ds, batch_size, shuffle=True)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = get_model().to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        for x_train_batch, y_train_batch in train_dl:
            y = model(x_train_batch.float())
            loss = criterion(y.flatten(), y_train_batch.float())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        epoch += 1
        logger.info(f"epoch: {epoch} -> loss: {loss}")

    with torch.no_grad():
        y = model(x_test.float()).flatten()
        mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
    print("Test MSE:", mse.numpy())

    os.makedirs(model_dir, exist_ok=True)
    torch.save(model.state_dict(), model_dir + "/model.pth")
    
    inference_code_path = model_dir + "/code/"
    if not os.path.exists(inference_code_path):
        os.mkdir(inference_code_path)
        logger.info(f"Created a folder at {inference_code_path}!")

    shutil.copy("local_training_script.py", inference_code_path)
    shutil.copy("pytorch_model_def.py", inference_code_path)
    logger.info(f"Saving models files to {inference_code_path}")

if __name__ == "__main__":
    print("Running the training job ...")
    train()
'''

with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
    f.write(training_script)

print("Created local_training_script.py")

Step 3: Configure Local Container Training#

Set up ModelTrainer to run in LOCAL_CONTAINER mode.

source_code = SourceCode(
    source_dir=source_dir,
    entry_script="local_training_script.py",
)

compute = Compute(
    instance_type="local_cpu",
    instance_count=1,
)

train_data = InputData(
    channel_name="train",
    data_source=train_dir,
)

test_data = InputData(
    channel_name="test",
    data_source=test_dir,
)

print("Configuration complete")

Step 4: Create ModelTrainer#

Initialize ModelTrainer with the local container configuration.

model_trainer = ModelTrainer(
    training_image=DEFAULT_CPU_IMAGE,
    sagemaker_session=sagemaker_session,
    source_code=source_code,
    compute=compute,
    input_data_config=[train_data, test_data],
    base_job_name="local_mode_single_container_local_data",
    training_mode=Mode.LOCAL_CONTAINER,
)

print("ModelTrainer created with LOCAL_CONTAINER mode!")

Step 5: Run Local Container Training#

Start the training job in local Docker container.

print("Starting local container training...")

try:
    model_trainer.train()
    print("Local container training completed successfully!")
    operation_successful = True
except Exception as e:
    print(f"Training failed with error: {e}")
    operation_successful = False

Step 6: Check Training Results#

Examine the training artifacts created by local container training.

if operation_successful:
    current_dir = os.getcwd()
    
    directories_to_check = [
        "compressed_artifacts",
        "artifacts", 
        "model",
        "output",
    ]
    
    print("Training Results:")
    for directory in directories_to_check:
        path = os.path.join(current_dir, directory)
        if os.path.exists(path):
            print(f"✓ {directory}: Found")
            if os.path.isdir(path):
                files = os.listdir(path)
                print(f"  Contents: {files}")
        else:
            print(f"✗ {directory}: Not found")
    
    print("Local container training completed successfully!")
else:
    print("Training was not successful.")

Step 7: Clean Up#

Clean up local artifacts and temporary files.

try:
    subprocess.run(["docker", "compose", "down", "-v"], check=False)
    print("Docker containers stopped")
except Exception:
    pass

# Clean up temporary files
try:
    shutil.rmtree(temp_dir)
    print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
    print(f"Could not clean up temp directory: {e}")

# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]

for directory in directories:
    path = os.path.join(current_dir, directory)
    if os.path.exists(path):
        try:
            shutil.rmtree(path)
            print(f"Cleaned up: {directory}")
        except Exception as e:
            print(f"Could not clean up {directory}: {e}")

print("Cleanup completed!")

Summary#

This notebook demonstrated:

  1. Local container training: Running training in Docker containers locally

  2. Data preparation: Creating test data and training scripts

  3. Artifact management: Checking and cleaning up training artifacts

  4. Docker integration: Proper container lifecycle management

Local container training provides a great way to test training jobs locally before deploying to SageMaker cloud instances.