SageMaker V3 Distributed Local Training Example#
This notebook demonstrates how to run distributed training locally using SageMaker V3 ModelTrainer with multiple Docker containers. Note: This notebook will not run in SageMaker Studio.
import os
import subprocess
import tempfile
import shutil
import numpy as np
from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.train.distributed import Torchrun
from sagemaker.core.helper.session_helper import Session
from sagemaker.core import image_uris
# NOTE: Local mode requires Docker to be installed and running.
import os
os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']
Step 1: Setup Session and Create Test Data#
Initialize the SageMaker session and create the necessary test data and training script.
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
DEFAULT_CPU_IMAGE = image_uris.retrieve(
framework="pytorch",
region=region,
version="2.0.0",
py_version="py310",
instance_type="ml.m5.xlarge",
image_scope="training"
)
# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
print(f"Created temporary directories in: {temp_dir}")
print("Note: This will use multiple Docker containers locally for distributed training!")
Step 2: Create Training Data and Scripts#
Generate the test data and training scripts needed for distributed local training.
# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)
np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)
print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn
def get_model():
return nn.Sequential(
nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1)
)
'''
with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
f.write(pytorch_model_def)
print("Created pytorch_model_def.py")
# Create training script (same as single container for simplicity)
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"
def get_train_data(train_dir):
x_train = np.load(os.path.join(train_dir, "x_train.npy"))
y_train = np.load(os.path.join(train_dir, "y_train.npy"))
logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
return torch.from_numpy(x_train), torch.from_numpy(y_train)
def get_test_data(test_dir):
x_test = np.load(os.path.join(test_dir, "x_test.npy"))
y_test = np.load(os.path.join(test_dir, "y_test.npy"))
logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
return torch.from_numpy(x_test), torch.from_numpy(y_test)
def train():
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))
x_train, y_train = get_train_data(train_dir)
x_test, y_test = get_test_data(test_dir)
train_ds = TensorDataset(x_train, y_train)
batch_size = 64
epochs = 1
learning_rate = 0.1
logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")
train_dl = DataLoader(train_ds, batch_size, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
for epoch in range(epochs):
for x_train_batch, y_train_batch in train_dl:
y = model(x_train_batch.float())
loss = criterion(y.flatten(), y_train_batch.float())
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch += 1
logger.info(f"epoch: {epoch} -> loss: {loss}")
with torch.no_grad():
y = model(x_test.float()).flatten()
mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
print("Test MSE:", mse.numpy())
os.makedirs(model_dir, exist_ok=True)
torch.save(model.state_dict(), model_dir + "/model.pth")
inference_code_path = model_dir + "/code/"
if not os.path.exists(inference_code_path):
os.mkdir(inference_code_path)
logger.info(f"Created a folder at {inference_code_path}!")
shutil.copy("local_training_script.py", inference_code_path)
shutil.copy("pytorch_model_def.py", inference_code_path)
logger.info(f"Saving models files to {inference_code_path}")
if __name__ == "__main__":
print("Running the training job ...")
train()
'''
with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
f.write(training_script)
print("Created local_training_script.py")
Step 3: Configure Distributed Local Training#
Set up ModelTrainer for distributed training in local container mode.
source_code = SourceCode(
source_dir=source_dir,
entry_script="local_training_script.py",
)
distributed = Torchrun(
process_count_per_node=1,
)
compute = Compute(
instance_type="local_cpu",
instance_count=2,
)
train_data = InputData(
channel_name="train",
data_source=train_dir,
)
test_data = InputData(
channel_name="test",
data_source=test_dir,
)
print("Distributed Local Training Configuration:")
print(f" Containers: {compute.instance_count}")
print(f" Processes per container: {distributed.process_count_per_node}")
print(f" Total processes: {compute.instance_count * distributed.process_count_per_node}")
print(f" Distributed framework: Torchrun")
Step 4: Create Distributed ModelTrainer#
Initialize ModelTrainer for distributed local container training.
model_trainer = ModelTrainer(
training_image=DEFAULT_CPU_IMAGE,
sagemaker_session=sagemaker_session,
source_code=source_code,
distributed=distributed,
compute=compute,
input_data_config=[train_data, test_data],
base_job_name="local_mode_multi_container",
training_mode=Mode.LOCAL_CONTAINER,
)
print("Distributed ModelTrainer created successfully!")
print(f"Training mode: {Mode.LOCAL_CONTAINER}")
print(f"Distributed config: {distributed}")
Step 5: Run Distributed Local Training#
Start the distributed training job using multiple local containers.
print("Starting distributed local container training...")
print("This will launch 2 Docker containers with 1 training process each.")
try:
model_trainer.train()
print("Distributed local container training completed successfully!")
operation_successful = True
except Exception as e:
print(f"Training failed with error: {e}")
operation_successful = False
Step 6: Check Training Results#
Examine the results from distributed training.
if operation_successful:
current_dir = os.getcwd()
print("Distributed Training Results:")
print("=" * 35)
# Check that certain directories don't exist (cleanup verification)
assert not os.path.exists(os.path.join(current_dir, "shared"))
assert not os.path.exists(os.path.join(current_dir, "input"))
assert not os.path.exists(os.path.join(current_dir, "algo-1"))
assert not os.path.exists(os.path.join(current_dir, "algo-2"))
print("✓ Temporary directories properly cleaned up")
# Check for expected artifacts
directories_to_check = [
"compressed_artifacts",
"artifacts",
"model",
"output",
]
for directory in directories_to_check:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
print(f"✓ {directory}: Found")
else:
print(f"✗ {directory}: Not found")
print("\nDistributed Training Configuration:")
print(f" Training Image: {DEFAULT_CPU_IMAGE}")
print(f" Container Count: {compute.instance_count}")
print(f" Processes per Container: {distributed.process_count_per_node}")
print(f" Total Training Processes: {compute.instance_count * distributed.process_count_per_node}")
else:
print("Training was not successful.")
Step 7: Clean Up#
Clean up local artifacts and temporary files.
try:
subprocess.run(["docker", "compose", "down", "-v"], check=False)
print("Docker containers stopped")
except Exception:
pass
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
print(f"Could not clean up temp directory: {e}")
# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]
for directory in directories:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
try:
shutil.rmtree(path)
print(f"Cleaned up: {directory}")
except Exception as e:
print(f"Could not clean up {directory}: {e}")
# Final assertion
assert operation_successful
print("\n✓ Distributed local training completed successfully!")
print("Cleanup completed - all artifacts removed.")
Summary#
This notebook demonstrated:
Multi-container distributed training: Running training across multiple Docker containers locally
Torchrun integration: Using SageMaker’s Torchrun distributed driver
Local development workflow: Testing distributed training before cloud deployment
Proper cleanup: Following cleanup patterns for local artifacts
Distributed local training provides a great way to test distributed training patterns locally before deploying to SageMaker cloud instances, with no AWS costs and realistic container-based execution.