SageMaker V3 Distributed Local Training Example#

This notebook demonstrates how to run distributed training locally using SageMaker V3 ModelTrainer with multiple Docker containers. Note: This notebook will not run in SageMaker Studio.

import os
import subprocess
import tempfile
import shutil
import numpy as np

from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.train.distributed import Torchrun
from sagemaker.core.helper.session_helper import Session
from sagemaker.core import image_uris
# NOTE: Local mode requires Docker to be installed and running.
import os
os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']

Step 1: Setup Session and Create Test Data#

Initialize the SageMaker session and create the necessary test data and training script.

sagemaker_session = Session()
region = sagemaker_session.boto_region_name

DEFAULT_CPU_IMAGE = image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="2.0.0",
    py_version="py310",
    instance_type="ml.m5.xlarge",
    image_scope="training"
)

# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

print(f"Created temporary directories in: {temp_dir}")
print("Note: This will use multiple Docker containers locally for distributed training!")

Step 2: Create Training Data and Scripts#

Generate the test data and training scripts needed for distributed local training.

# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)

np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)

print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn

def get_model():
    return nn.Sequential(
        nn.Linear(4, 10),
        nn.ReLU(),
        nn.Linear(10, 1)
    )
'''

with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
    f.write(pytorch_model_def)

print("Created pytorch_model_def.py")
# Create training script (same as single container for simplicity)
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"

def get_train_data(train_dir):
    x_train = np.load(os.path.join(train_dir, "x_train.npy"))
    y_train = np.load(os.path.join(train_dir, "y_train.npy"))
    logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
    return torch.from_numpy(x_train), torch.from_numpy(y_train)

def get_test_data(test_dir):
    x_test = np.load(os.path.join(test_dir, "x_test.npy"))
    y_test = np.load(os.path.join(test_dir, "y_test.npy"))
    logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
    return torch.from_numpy(x_test), torch.from_numpy(y_test)

def train():
    train_dir = os.path.join(data_dir, "train")
    test_dir = os.path.join(data_dir, "test")
    model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))

    x_train, y_train = get_train_data(train_dir)
    x_test, y_test = get_test_data(test_dir)
    train_ds = TensorDataset(x_train, y_train)

    batch_size = 64
    epochs = 1
    learning_rate = 0.1
    logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")

    train_dl = DataLoader(train_ds, batch_size, shuffle=True)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = get_model().to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        for x_train_batch, y_train_batch in train_dl:
            y = model(x_train_batch.float())
            loss = criterion(y.flatten(), y_train_batch.float())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        epoch += 1
        logger.info(f"epoch: {epoch} -> loss: {loss}")

    with torch.no_grad():
        y = model(x_test.float()).flatten()
        mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
    print("Test MSE:", mse.numpy())

    os.makedirs(model_dir, exist_ok=True)
    torch.save(model.state_dict(), model_dir + "/model.pth")
    
    inference_code_path = model_dir + "/code/"
    if not os.path.exists(inference_code_path):
        os.mkdir(inference_code_path)
        logger.info(f"Created a folder at {inference_code_path}!")

    shutil.copy("local_training_script.py", inference_code_path)
    shutil.copy("pytorch_model_def.py", inference_code_path)
    logger.info(f"Saving models files to {inference_code_path}")

if __name__ == "__main__":
    print("Running the training job ...")
    train()
'''

with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
    f.write(training_script)

print("Created local_training_script.py")

Step 3: Configure Distributed Local Training#

Set up ModelTrainer for distributed training in local container mode.

source_code = SourceCode(
    source_dir=source_dir,
    entry_script="local_training_script.py",
)

distributed = Torchrun(
    process_count_per_node=1,
)

compute = Compute(
    instance_type="local_cpu",
    instance_count=2,
)

train_data = InputData(
    channel_name="train",
    data_source=train_dir,
)

test_data = InputData(
    channel_name="test",
    data_source=test_dir,
)

print("Distributed Local Training Configuration:")
print(f"  Containers: {compute.instance_count}")
print(f"  Processes per container: {distributed.process_count_per_node}")
print(f"  Total processes: {compute.instance_count * distributed.process_count_per_node}")
print(f"  Distributed framework: Torchrun")

Step 4: Create Distributed ModelTrainer#

Initialize ModelTrainer for distributed local container training.

model_trainer = ModelTrainer(
    training_image=DEFAULT_CPU_IMAGE,
    sagemaker_session=sagemaker_session,
    source_code=source_code,
    distributed=distributed,
    compute=compute,
    input_data_config=[train_data, test_data],
    base_job_name="local_mode_multi_container",
    training_mode=Mode.LOCAL_CONTAINER,
)

print("Distributed ModelTrainer created successfully!")
print(f"Training mode: {Mode.LOCAL_CONTAINER}")
print(f"Distributed config: {distributed}")

Step 5: Run Distributed Local Training#

Start the distributed training job using multiple local containers.

print("Starting distributed local container training...")
print("This will launch 2 Docker containers with 1 training process each.")

try:
    model_trainer.train()
    print("Distributed local container training completed successfully!")
    operation_successful = True
except Exception as e:
    print(f"Training failed with error: {e}")
    operation_successful = False

Step 6: Check Training Results#

Examine the results from distributed training.

if operation_successful:
    current_dir = os.getcwd()
    
    print("Distributed Training Results:")
    print("=" * 35)
    
    # Check that certain directories don't exist (cleanup verification)
    assert not os.path.exists(os.path.join(current_dir, "shared"))
    assert not os.path.exists(os.path.join(current_dir, "input"))
    assert not os.path.exists(os.path.join(current_dir, "algo-1"))
    assert not os.path.exists(os.path.join(current_dir, "algo-2"))
    print("✓ Temporary directories properly cleaned up")
    
    # Check for expected artifacts
    directories_to_check = [
        "compressed_artifacts",
        "artifacts",
        "model",
        "output",
    ]
    
    for directory in directories_to_check:
        path = os.path.join(current_dir, directory)
        if os.path.exists(path):
            print(f"✓ {directory}: Found")
        else:
            print(f"✗ {directory}: Not found")
    
    print("\nDistributed Training Configuration:")
    print(f"  Training Image: {DEFAULT_CPU_IMAGE}")
    print(f"  Container Count: {compute.instance_count}")
    print(f"  Processes per Container: {distributed.process_count_per_node}")
    print(f"  Total Training Processes: {compute.instance_count * distributed.process_count_per_node}")
    
else:
    print("Training was not successful.")

Step 7: Clean Up#

Clean up local artifacts and temporary files.

try:
    subprocess.run(["docker", "compose", "down", "-v"], check=False)
    print("Docker containers stopped")
except Exception:
    pass

# Clean up temporary files
try:
    shutil.rmtree(temp_dir)
    print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
    print(f"Could not clean up temp directory: {e}")

# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]

for directory in directories:
    path = os.path.join(current_dir, directory)
    if os.path.exists(path):
        try:
            shutil.rmtree(path)
            print(f"Cleaned up: {directory}")
        except Exception as e:
            print(f"Could not clean up {directory}: {e}")

# Final assertion
assert operation_successful
print("\n✓ Distributed local training completed successfully!")
print("Cleanup completed - all artifacts removed.")

Summary#

This notebook demonstrated:

  1. Multi-container distributed training: Running training across multiple Docker containers locally

  2. Torchrun integration: Using SageMaker’s Torchrun distributed driver

  3. Local development workflow: Testing distributed training before cloud deployment

  4. Proper cleanup: Following cleanup patterns for local artifacts

Distributed local training provides a great way to test distributed training patterns locally before deploying to SageMaker cloud instances, with no AWS costs and realistic container-based execution.