SageMakerCore Overview of Resource Level Abstractions - XGBoost Training Example#
Introductions#
SageMakerCore is a Python SDK designed as a lightweight layer over boto3, the AWS SDK for Python. It is built on the concept of resource level abstractions, where SageMaker Resources are represented as Python classes. This approach enables SageMakerCore to simplify the management of SageMaker Resources and provide a more object-oriented programming interface.
Resource Level Abstraction#
Resource Level Abstractions can be best understood by examining how the AWS TrainingJob APIs are transfromed into a TrainingJob Python class abstraction in SageMakerCore.
For instance, an AWS TrainingJob has the following APIs:
CreateTrainingJob
DescribeTrainingJob
UpdateTrainingJob
StopTrainingJob
ListTrainingJobs
In SageMakerCore, these APIs are encapsulated within a TrainingJob class that exposes these operations as methods and attributes. The details of the TrainingJob class are below:
class TrainingJob(Base):
# Class attributes are mapped to describe_training_job response
training_job_name: str
training_job_arn: Optional[str] = Unassigned()
tuning_job_arn: Optional[str] = Unassigned()
labeling_job_arn: Optional[str] = Unassigned()
auto_ml_job_arn: Optional[str] = Unassigned()
model_artifacts: Optional[ModelArtifacts] = Unassigned()
training_job_status: Optional[str] = Unassigned()
...
@classmethod
def create(): # Calls `create_training_job`
@classmethod
def get(): # Calls `describe_training_job`
@classmethod
def get_all(): # Calls `list_training_job`
def update(): # Calls `update_training_job`
def stop(): # Calls `stop_training_job`
def refresh(): # Calls `describe_training_job` and refreshes instance attributes
def wait(): # Calls `describe_training_job` and waits for TrainingJob to enter terminal state
Comparing Boto3 and SageMakerCore SDKs#
In this notebook, we create an AWS TrainingJob to train an XGBoost Container. We will be using both Boto3 and the SageMakerCore SDKs with the goal of highlighting and comparing the differences in user experience for performing operations such as creating, updating, waiting, and listing AWS TrainingJobs.
Install Latest SageMakerCore#
All SageMakerCore beta distributions will be released to a private s3 bucket. After being allowlisted, run the cells below to install the latest version of SageMakerCore from s3://sagemaker-core-beta-artifacts/sagemaker_core-latest.tar.gz
Ensure you are using a kernel with python version >=3.8
# Uninstall previous version of sagemaker-core and restart kernel
!pip uninstall sagemaker-core -y
# Install the latest version of sagemaker-core
!pip install sagemaker-core --upgrade
# Check the version of sagemaker-core
!pip show -v sagemaker-core
Install Additional Packages#
# Install additional packages
!pip install -U scikit-learn pandas boto3
Setup#
Let’s start by specifying:
AWS region.
The IAM role arn used to give learning and hosting access to your data. Ensure your enviornment has AWS Credentials configured.
The S3 bucket that you want to use for storing training and model data.
from sagemaker.core.helper.session_helper import Session, get_execution_role
from rich import print
# Get region, role, bucket
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
print(role)
Load and Prepare Dataset#
For this example, we will be using the IRIS data set from sklearn.datasets to train our XGBoost container.
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
# Get IRIS Data
iris = load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target
import os
# Prepare Data
os.makedirs('./data', exist_ok=True)
iris_df = iris_df[['target'] + [col for col in iris_df.columns if col != 'target']]
train_data, test_data = train_test_split(iris_df, test_size=0.2, random_state=42)
train_data.to_csv('./data/train.csv', index=False, header=False)
Upload Data to S3#
In this step, we will upload the train and test data to the S3 bucket configured earlier using sagemaker_session.default_bucket()
# Upload Data
prefix = "DEMO-scikit-iris"
TRAIN_DATA = "train.csv"
DATA_DIRECTORY = "data"
train_input = sagemaker_session.upload_data(
DATA_DIRECTORY, bucket=bucket, key_prefix="{}/{}".format(prefix, DATA_DIRECTORY)
)
s3_input_path = "s3://{}/{}/data/{}".format(bucket, prefix, TRAIN_DATA)
s3_output_path = "s3://{}/{}/output".format(bucket, prefix)
print(s3_input_path)
print(s3_output_path)
Fetch the XGBoost Image URI#
In this step, we will fetch the XGBoost Image URI we will use as an input parameter when creating an AWS TrainingJob
from sagemaker.core import image_uris
image = image_uris.retrieve(
framework="xgboost",
region=region,
version='latest'
)
Create TrainingJob with Boto3#
With the necessary setup completed, we can now create an AWS TrainingJob. First we will begin by creating a TrainingJob with Boto3 to understand what the experience is like when interecting directly with low-level APIs through Boto3.
When executing the following cells there are a few things to note about the experience with Boto3:
Boto3 dynamically generates the API operation methods like
create_training_job. When a client is instantiated, the methods are generated from the JSON service model description and are not statically coded into the boto3 library.Boto3 returns a JSON response. As a result, users must either be familiar with the structure of these responses or refer to the documentation to parse them correctly.
Boto3 client methods expect keyword arguments. Similar to the experience with JSON response, users must be familiar with what keyword argumnets are expected or refer to the documentation to pass them correctly.
# Create TrainingJob with Boto3
import time
import boto3
client = boto3.client('sagemaker')
job_name_boto = 'xgboost-iris-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
response = client.create_training_job(
TrainingJobName=job_name_boto,
HyperParameters={
'objective': 'multi:softmax',
'num_class': '3',
'num_round': '10',
'eval_metric': 'merror'
},
AlgorithmSpecification={
'TrainingImage': image,
'TrainingInputMode': 'File'
},
RoleArn=role,
InputDataConfig=[
{
'ChannelName': 'train',
'ContentType': 'csv',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': s3_input_path,
'S3DataDistributionType': 'FullyReplicated'
}
},
'CompressionType': 'None',
'RecordWrapperType': 'None'
}
],
OutputDataConfig={
'S3OutputPath': s3_output_path
},
ResourceConfig={
'InstanceType': 'ml.m4.xlarge',
'InstanceCount': 1,
'VolumeSizeInGB': 30
},
StoppingCondition={
'MaxRuntimeInSeconds': 600
}
)
print(response)
Wait for TrainingJob with Boto3#
When a user creates a TrainingJob it is often the case that they would wish to wait on the TrainingJob to complete. Below is an example of how a user wait on a TrainingJob using Boto3. Notebly, this requires creating some logic to poll the TrainingJob using describe_training_job until the TrainingJobStatus is 'Failed', 'Completed', or 'Stopped'.
# Wait for TrainingJob with Boto3
import time
while True:
response = client.describe_training_job(TrainingJobName=job_name_boto)
status = response['TrainingJobStatus']
if status in ['Failed', 'Completed', 'Stopped']:
if status == 'Failed':
print(response['FailureReason'])
break
print("-", end="")
time.sleep(5)
Create TrainingJob with SageMakerCore#
In this step we will use SageMakerCore to create a TrainingJob to understand what experience the object-oriented resource level abstractions provide for users.
When executing the following cells, there are a few things to note about the experience with SageMakerCore:
SageMakerCore generates Python classes and methods from the service model JSON, similar to Boto3. However, this generation is done prior to a release, resulting in a statically coded interface in the library.
SageMakerCore adopts an object-oriented approach, providing users with clear visibility of available methods and attributes through type hinting and IDE IntelliSense
Instead of returning JSON responses like Boto3, SageMakerCore returns objects. This allows users to access response attributes directly from the returned object, eliminating the need to parse JSON or refer to the documentation for structure details.
# Create TrainingJob with SageMakerCore
import time
from sagemaker.core.resources import TrainingJob, AlgorithmSpecification, Channel, DataSource, S3DataSource, \
OutputDataConfig, ResourceConfig, StoppingCondition
job_name_v3 = 'xgboost-iris-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
training_job = TrainingJob.create(
training_job_name=job_name_v3,
hyper_parameters={
'objective': 'multi:softmax',
'num_class': '3',
'num_round': '10',
'eval_metric': 'merror'
},
algorithm_specification=AlgorithmSpecification(
training_image=image,
training_input_mode='File'
),
role_arn=role,
input_data_config=[
Channel(
channel_name='train',
content_type='csv',
compression_type='None',
record_wrapper_type='None',
data_source=DataSource(
s3_data_source=S3DataSource(
s3_data_type='S3Prefix',
s3_uri=s3_input_path,
s3_data_distribution_type='FullyReplicated'
)
)
)
],
output_data_config=OutputDataConfig(
s3_output_path=s3_output_path
),
resource_config=ResourceConfig(
instance_type='ml.m4.xlarge',
instance_count=1,
volume_size_in_gb=30
),
stopping_condition=StoppingCondition(
max_runtime_in_seconds=600
)
)
Wait for TrainingJob with SageMakerCore#
In SageMakerCore, the logic required to wait on a resource is abstracted away using a wait() method. As a result, a user can directly call the wait() method on a TrainingJob object instance like below.
# Wait for TrainingJob with SageMakerCore
training_job.wait(logs=True)
List TrainingJobs with Boto3#
When a user lists TrainingJobs, there are 2 main approaches provided by Boto3.
The first is calling
list_training_jobsdirectly and implementing some logic to handle the NextToken provided in the response to enable pagination.The second is by utilizing the Boto3
get_paginatormethod to get a paginator that encapsulates the NextToken and simplifies the logic required.
Both approaches are shown below. Although the boto3 provided paginator simplifies the logic over using a NextToken, in both cases the user must understand the structure of the list responses or refer to the docs (ie, understand to access TrainingJobSummaries by doing response["TrainingJobSummaries"])
# List TrainingJobs with Boto3
import datetime
import boto3
client = boto3.client('sagemaker')
creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)
# List TrainingJobs with NextToken
next_token = None
while True:
if next_token:
response = client.list_training_jobs(CreationTimeAfter=creation_time_after, NextToken=next_token)
else:
response = client.list_training_jobs(CreationTimeAfter=creation_time_after)
for job in response['TrainingJobSummaries']:
print(job['TrainingJobName'], job["TrainingJobStatus"])
next_token = response.get('NextToken')
if not next_token:
break
import datetime
import boto3
client = boto3.client('sagemaker')
creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)
# List TrainingJobs with Boto3 Paginator
paginator = client.get_paginator('list_training_jobs')
for response in paginator.paginate(CreationTimeAfter=creation_time_after):
for job in response['TrainingJobSummaries']:
print(job['TrainingJobName'], job["TrainingJobStatus"])
List TrainingJobs with SageMakerCore#
In SageMakerCore, listing is done similar to the boto3 paginator approach but instead with a ResourceIterator which implements the python iterator protocol to instantiate and return resource objects only as they are accessed.
Below, is an example of how the get_all() method would be used to list TrainingJobs.
# List TrainingJobs with SageMakerCore
import datetime
from sagemaker.core.resources import TrainingJob
creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)
resource_iterator = TrainingJob.get_all(creation_time_after=creation_time_after)
for job in resource_iterator:
print(job.training_job_name, job.training_job_status)
Delete All SageMaker Resources#
The following code block will call the delete() method for any SageMaker Core Resources created during the execution of this notebook which were assigned to local or global variables. If you created any additional deleteable resources without assigning the returning object to a unique variable, you will need to delete the resource manually by doing something like:
resource = Resource.get("resource-name")
resource.delete()
# Delete any sagemaker core resource objects created in this notebook
def delete_all_sagemaker_resources():
all_objects = list(locals().values()) + list(globals().values())
deletable_objects = [obj for obj in all_objects if hasattr(obj, 'delete') and obj.__class__.__module__ == 'sagemaker.core.resources']
for obj in deletable_objects:
obj.delete()
delete_all_sagemaker_resources()