SageMaker V3 Local Training Example#
This notebook demonstrates how to use SageMaker V3 ModelTrainer in Local Container mode for testing training jobs in Docker containers locally. Note: This notebook will not run in SageMaker Studio.
import os
import subprocess
import tempfile
import shutil
import numpy as np
from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.core.helper.session_helper import Session
# NOTE: Local mode requires Docker to be installed and running.
import os
import sys
if sys.platform == 'darwin':
os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:/Applications/Docker.app/Contents/Resources/cli-plugins:' + os.environ['PATH']
Step 1: Setup Session and Create Test Data#
Initialize the SageMaker session and create the necessary test data and training script.
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
# Get the correct ECR image for your region
from sagemaker.core import image_uris
DEFAULT_CPU_IMAGE = image_uris.retrieve(
framework="pytorch",
region=region,
version="2.0.0",
py_version="py310",
instance_type="ml.m5.xlarge",
image_scope="training"
)
# Set Docker platform for Apple Silicon compatibility
import platform
if platform.machine() == 'arm64':
os.environ['DOCKER_DEFAULT_PLATFORM'] = 'linux/amd64'
# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
print(f"Created temporary directories in: {temp_dir}")
Step 2: Create Training Data and Scripts#
Generate the test data and training scripts needed for local container training.
# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)
np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)
print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn
def get_model():
return nn.Sequential(
nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1)
)
'''
with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
f.write(pytorch_model_def)
print("Created pytorch_model_def.py")
# Create training script
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"
def get_train_data(train_dir):
x_train = np.load(os.path.join(train_dir, "x_train.npy"))
y_train = np.load(os.path.join(train_dir, "y_train.npy"))
logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
return torch.from_numpy(x_train), torch.from_numpy(y_train)
def get_test_data(test_dir):
x_test = np.load(os.path.join(test_dir, "x_test.npy"))
y_test = np.load(os.path.join(test_dir, "y_test.npy"))
logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
return torch.from_numpy(x_test), torch.from_numpy(y_test)
def train():
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))
x_train, y_train = get_train_data(train_dir)
x_test, y_test = get_test_data(test_dir)
train_ds = TensorDataset(x_train, y_train)
batch_size = 64
epochs = 1
learning_rate = 0.1
logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")
train_dl = DataLoader(train_ds, batch_size, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
for epoch in range(epochs):
for x_train_batch, y_train_batch in train_dl:
y = model(x_train_batch.float())
loss = criterion(y.flatten(), y_train_batch.float())
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch += 1
logger.info(f"epoch: {epoch} -> loss: {loss}")
with torch.no_grad():
y = model(x_test.float()).flatten()
mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
print("Test MSE:", mse.numpy())
os.makedirs(model_dir, exist_ok=True)
torch.save(model.state_dict(), model_dir + "/model.pth")
inference_code_path = model_dir + "/code/"
if not os.path.exists(inference_code_path):
os.mkdir(inference_code_path)
logger.info(f"Created a folder at {inference_code_path}!")
shutil.copy("local_training_script.py", inference_code_path)
shutil.copy("pytorch_model_def.py", inference_code_path)
logger.info(f"Saving models files to {inference_code_path}")
if __name__ == "__main__":
print("Running the training job ...")
train()
'''
with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
f.write(training_script)
print("Created local_training_script.py")
Step 3: Configure Local Container Training#
Set up ModelTrainer to run in LOCAL_CONTAINER mode.
source_code = SourceCode(
source_dir=source_dir,
entry_script="local_training_script.py",
)
compute = Compute(
instance_type="local_cpu",
instance_count=1,
)
train_data = InputData(
channel_name="train",
data_source=train_dir,
)
test_data = InputData(
channel_name="test",
data_source=test_dir,
)
print("Configuration complete")
Step 4: Create ModelTrainer#
Initialize ModelTrainer with the local container configuration.
model_trainer = ModelTrainer(
training_image=DEFAULT_CPU_IMAGE,
sagemaker_session=sagemaker_session,
source_code=source_code,
compute=compute,
input_data_config=[train_data, test_data],
base_job_name="local_mode_single_container_local_data",
training_mode=Mode.LOCAL_CONTAINER,
)
print("ModelTrainer created with LOCAL_CONTAINER mode!")
Step 5: Run Local Container Training#
Start the training job in local Docker container.
print("Starting local container training...")
try:
model_trainer.train()
print("Local container training completed successfully!")
operation_successful = True
except Exception as e:
print(f"Training failed with error: {e}")
operation_successful = False
Step 6: Check Training Results#
Examine the training artifacts created by local container training.
if operation_successful:
current_dir = os.getcwd()
directories_to_check = [
"compressed_artifacts",
"artifacts",
"model",
"output",
]
print("Training Results:")
for directory in directories_to_check:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
print(f"✓ {directory}: Found")
if os.path.isdir(path):
files = os.listdir(path)
print(f" Contents: {files}")
else:
print(f"✗ {directory}: Not found")
print("Local container training completed successfully!")
else:
print("Training was not successful.")
Step 7: Clean Up#
Clean up local artifacts and temporary files.
try:
subprocess.run(["docker", "compose", "down", "-v"], check=False)
print("Docker containers stopped")
except Exception:
pass
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
print(f"Could not clean up temp directory: {e}")
# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]
for directory in directories:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
try:
shutil.rmtree(path)
print(f"Cleaned up: {directory}")
except Exception as e:
print(f"Could not clean up {directory}: {e}")
print("Cleanup completed!")
Summary#
This notebook demonstrated:
Local container training: Running training in Docker containers locally
Data preparation: Creating test data and training scripts
Artifact management: Checking and cleaning up training artifacts
Docker integration: Proper container lifecycle management
Local container training provides a great way to test training jobs locally before deploying to SageMaker cloud instances.