Get started with SageMaker#

In this notebook you’ll learn how SageMaker can be used to:

  1. Preprocess (and optionally explore) a dataset

  2. Train an XGBoost classifier for customer churn prediction, using a managed job with SageMaker Training, using a managed image.

  3. Perform hyperparameter tuning to find optimal set of hyperparameters, using a managed job with SageMaker HyperParameter Tuning

  4. Perform batch inference using a managed SageMaker Batch Transform job.

  5. Create a managed real-time SageMaker endpoint.

All SageMaker resources are created using the SageMaker Core SDK. You can find more information about sagemaker-core here

%pip install --upgrade pip -q
%pip install sagemaker-core -q
import time
from sagemaker.core.helper.session_helper import Session, get_execution_role

# Set up region, role and bucket parameters used throughout the notebook.
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
bucket = sagemaker_session.default_bucket()

print(f"AWS region: {region}")
print(f"Execution role: {role}")
print(f"Default S3 bucket: {bucket}")

Preprocess dataset#

We’ll use a synthetic dataset that AWS provides for customer churn prediction.

NOTE: This sample doesn't perform any exploratory data anlysis since how to preprocess the dataset is already known.

If you’re interested in how to perform exploratory analysis, there’s a section in the documentation for the sagemaker-python-sdk available that explores the dataset, here.

Read the data from S3#

from io import StringIO
import pandas as pd

data = sagemaker_session.read_s3_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/synthetic/churn.txt"
)

df = pd.read_csv(StringIO(data))
df

Apply processing#

from sklearn.model_selection import train_test_split

# Phone number is unique - will not add value to classifier
df = df.drop("Phone", axis=1)

# Cast Area Code to non-numeric
df["Area Code"] = df["Area Code"].astype(object)

# Remove one feature from highly corelated pairs
df = df.drop(["Day Charge", "Eve Charge", "Night Charge", "Intl Charge"], axis=1)

# One-hot encode catagorical features into numeric features
model_data = pd.get_dummies(df) 
model_data = pd.concat(
    [model_data["Churn?_True."], model_data.drop(["Churn?_False.", "Churn?_True."], axis=1)], axis=1
)
model_data = model_data.astype(float)

# Split data into train and validation datasets
train_data, validation_data = train_test_split(
    model_data, test_size=0.33, random_state=42)

# Further split the validation dataset into test and validation datasets.
validation_data, test_data = train_test_split(
    validation_data, test_size=0.33, random_state=42)

# Remove and store the target column for the test data. This is used for calculating performance metrics after training, on unseen data.
test_target_column = test_data['Churn?_True.']
test_data.drop(['Churn?_True.'], axis=1, inplace=True)

# Store all datasets locally
train_data.to_csv("train.csv", header=False, index=False)
validation_data.to_csv("validation.csv", header=False, index=False)
test_data.to_csv("test.csv", header=False, index=False)

# Upload each dataset to S3
s3_train_input = sagemaker_session.upload_data('train.csv', bucket)
s3_validation_input = sagemaker_session.upload_data('validation.csv', bucket)
s3_test_input = sagemaker_session.upload_data('test.csv', bucket)

print('Datasets uploaded to:')
print(s3_train_input)
print(s3_validation_input)
print(s3_test_input)

Train a classifier using XGBoost#

Use SageMaker Training and the managed XGBoost image to train a classifier.
More details on how to use SageMaker managed training with XGBoost can be found here.

NOTE: For more information on using SageMaker managed container images and retrieving their ECR paths, here is the documentation. Please note that the image URI might need to be updated based on your selected AWS region.
from sagemaker.core import image_uris

image = image_uris.retrieve(
    framework="xgboost",
    region=region,
    version='1.7-1'
)
from sagemaker.core.resources import TrainingJob
from sagemaker.core.shapes import AlgorithmSpecification, Channel, DataSource, S3DataSource, ResourceConfig, StoppingCondition, OutputDataConfig

job_name = 'xgboost-churn-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of training job
instance_type = 'ml.m4.xlarge'  # SageMaker instance type to use for training
instance_count = 1  # Number of instances to use for training
volume_size_in_gb = 30  # Amount of storage to allocate to training job
max_runtime_in_seconds = 600  # Maximum runtimt. Job exits if it doesn't finish before this
s3_output_path = f"s3://{bucket}"  # bucket and optional prefix where the training job stores output artifacts, like model artifact.

# Specify hyperparameters
hyper_parameters = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.8",
    "verbosity": "0",
    "objective": "binary:logistic",
    "num_round": "100",
}

# Create training job.
training_job = TrainingJob.create(
    training_job_name=job_name,
    hyper_parameters=hyper_parameters,
    algorithm_specification=AlgorithmSpecification(
        training_image=image,
        training_input_mode='File'
    ),
    role_arn=role,
    input_data_config=[
        Channel(
            channel_name='train',
            content_type='csv',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_train_input,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        ),
        Channel(
            channel_name='validation',
            content_type='csv',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_validation_input,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        )
    ],
    output_data_config=OutputDataConfig(
        s3_output_path=s3_output_path
    ),
    resource_config=ResourceConfig(
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size_in_gb=volume_size_in_gb
    ),
    stopping_condition=StoppingCondition(
        max_runtime_in_seconds=max_runtime_in_seconds
    )
)

# Wait for the training job to complete
training_job.wait()

Hyperparameter tuning#

If the optimal hyperparameters aren’t known, we perform a SageMaker Hyperparameter Tuning job, which runs several training jobs and iteratively finds the best set of parameters.

From a high level, a tuning job constists of 2 main components:

  • HyperParameterTrainingJobDefinition, which specifies details for each individidual training job, like image to use, input channels, resource configuration etc.

  • HyperParameterTuningJobConfig, which details the tuning configuration, like what strategy to use, how many jobs to run and what parameters to tune etc.

You can find more information about how it works here.

from sagemaker.core.resources import HyperParameterTuningJob
from sagemaker.core.shapes import HyperParameterTuningJobConfig, \
     ResourceLimits, HyperParameterTuningJobWarmStartConfig, ParameterRanges, AutoParameter, \
     Autotune, HyperParameterTrainingJobDefinition, HyperParameterTuningJobObjective, HyperParameterAlgorithmSpecification, \
     OutputDataConfig, StoppingCondition, ResourceConfig

tuning_job_name = 'xgboost-tune-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of tuning job

max_number_of_training_jobs = 50  # Maximum number of training jobs to run as part of the tuning job.
max_parallel_training_jobs = 5  # Maximum number of parallell training.
max_runtime_in_seconds = 3600  # Maximum runtime for tuning job.

# Create HyperParameterTrainingJobDefinition object, containing information about each individual training job.
hyper_parameter_training_job_defintion = HyperParameterTrainingJobDefinition(
        role_arn=role,
        algorithm_specification=HyperParameterAlgorithmSpecification(
            training_image=image,
            training_input_mode='File'
        ),
        input_data_config=[
            Channel(
                channel_name='train',
                content_type='csv',
                data_source=DataSource(
                    s3_data_source=S3DataSource(
                        s3_data_type='S3Prefix',
                        s3_uri=s3_train_input,
                        s3_data_distribution_type='FullyReplicated'
                    )
                )
            ),
            Channel(
                channel_name='validation',
                content_type='csv',
                data_source=DataSource(
                    s3_data_source=S3DataSource(
                        s3_data_type='S3Prefix',
                        s3_uri=s3_validation_input,
                        s3_data_distribution_type='FullyReplicated'
                    )
                )
            )
        ],
        output_data_config=OutputDataConfig(
            s3_output_path=s3_output_path
        ),
        stopping_condition=StoppingCondition(
            max_runtime_in_seconds=max_runtime_in_seconds
        ),
        resource_config=ResourceConfig(
            instance_type=instance_type,
            instance_count=instance_count,
            volume_size_in_gb=volume_size_in_gb,
        )
    )

# Create HyperParameterTrainingJobDefinition object, containing information about the tuning job
tuning_job_config = HyperParameterTuningJobConfig(
        strategy='Bayesian',
        hyper_parameter_tuning_job_objective=HyperParameterTuningJobObjective(
            type='Maximize',
            metric_name='validation:auc'
        ),
        resource_limits=ResourceLimits(
            max_number_of_training_jobs=max_number_of_training_jobs,
            max_parallel_training_jobs=max_parallel_training_jobs,
            max_runtime_in_seconds=3600
        ),
        training_job_early_stopping_type='Auto',
        parameter_ranges=ParameterRanges(
            auto_parameters=[
                AutoParameter(
                    name='max_depth',
                    value_hint='5'
                ),
                AutoParameter(
                    name='eta',
                    value_hint='0.1'
                ),
                AutoParameter(
                    name='gamma',
                    value_hint='8'
                ),
                AutoParameter(
                    name='min_child_weight',
                    value_hint='2'
                ),
                AutoParameter(
                    name='subsample',
                    value_hint='0.5'
                ),
                AutoParameter(
                    name='num_round',
                    value_hint='50'
                )
            ]
        )
    )

# Create the tuning job using the 2 configuration objects above
tuning_job = HyperParameterTuningJob.create(
    hyper_parameter_tuning_job_name=tuning_job_name,
    autotune=Autotune(
        mode='Enabled'
    ),
    training_job_definition=hyper_parameter_training_job_defintion,
    hyper_parameter_tuning_job_config=tuning_job_config
)

tuning_job.wait()

Use model artifacts for batch inference#

To use the model to perform batch inference, we can use a SageMaker Batch Transform job. The Transform Job requires a SageMaker model object, which contains information about what image and model to use.

Below, we:

  1. Create a SageMaker model with the same first-party image as we used for training, and the model artifacts produced during training. Indeed, such image can also be used to run inference

  2. Use that SagMaker model with a Transform Job to perform batch inference with our test dataset

  3. Compute some performance metrics

More information about SageMaker Batch Transform can be found here.

Create SageMaker Model#

Create a Model resource based on the model artifacts produced by the best training job run by through hyperparameter tuning.

from sagemaker.core.resources import Model
from sagemaker.core.shapes import ContainerDefinition

#model_s3_uri = training_job.model_artifacts.s3_model_artifacts  # Get URI of model artifacts from the training job.
model_s3_uri = TrainingJob.get(tuning_job.best_training_job.training_job_name).model_artifacts.s3_model_artifacts # Get URI of model artifacts of the best model from the tuning job.


# Create SageMaker model: An image along with the model artifact to use.
customer_churn_model = Model.create(
    model_name='customer-churn-xgboost',
    primary_container=ContainerDefinition(
        image=image,
        model_data_url=model_s3_uri
    ),
    execution_role_arn=role
)

Create Transform Job#

from sagemaker.core.resources import TransformJob
from sagemaker.core.shapes import TransformInput, TransformDataSource, TransformS3DataSource, TransformOutput, TransformResources

model_name = customer_churn_model.get_name()
transform_job_name = 'churn-prediction' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of TranformJob
s3_output_path = f"s3://{bucket}/transform"  # bucket and optional prefix where the TranformJob stores the result.
instance_type = 'ml.m4.xlarge'  # SageMaker instance type to use for TranformJob
instance_count = 1  # Number of instances to use for TranformJob

# Create Transform Job.
transform_job = TransformJob.create(
    transform_job_name=transform_job_name,
    model_name=model_name,
    transform_input=TransformInput(
        data_source=TransformDataSource(
            s3_data_source=TransformS3DataSource(
                s3_data_type="S3Prefix",
                s3_uri=s3_test_input
            )
        ),
        content_type="text/csv"
    ),
    transform_output=TransformOutput(
        s3_output_path=s3_output_path
    ),
    transform_resources=TransformResources(
        instance_type=instance_type,
        instance_count=instance_count
    )
)

transform_job.wait()

Compute performance metrics#

from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score

# A Transform Job uploads the results to a given output path in S3, with the name of the input file, with ".out" added at the end. 
output_file_name = transform_job.transform_input.data_source.s3_data_source.s3_uri.split('/')[-1] + '.out'  # Get output file name
output_s3_uri = f"{transform_job.transform_output.s3_output_path}/{output_file_name}"  # Create output S3 URI

def split_s3_path(s3_path):
    '''Lightweight method for extracting bucket and object key from S3 uri'''
    path_parts = s3_path.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    return bucket, key

def print_performance_metrics(probs, y, threshold = 0.5):
    '''Lightweight method for printing performance metrics'''
    
    predictions = (probs >= threshold).astype(int)

    # Compare predictions with the stored target
    accuracy = accuracy_score(y, predictions)
    precision = precision_score(y, predictions)
    recall = recall_score(y, predictions)
    roc_auc = roc_auc_score(y, probs)

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"ROC AUC: {roc_auc}")


# Extract bucket and key separately from uri
res_bucket, res_key = split_s3_path(output_s3_uri)

# Download Transform Job results
transform_job_result = sagemaker_session.read_s3_file(res_bucket, res_key)
transform_job_result = pd.read_csv(StringIO(transform_job_result), header=None)

print_performance_metrics(transform_job_result, test_target_column)

Create SageMaker endpoint for real-time inference#

To create a SageMaker endpoint we first create an EndpointConfig. The endpoint configuration specifies what SageMaker model to use, and what endpoint type. We then use the EndpointConfig together with other optional parameters to create a SageMaker Endpoint.

More information about SageMaker Endpoints can be found here.

from sagemaker.core.resources import Endpoint, EndpointConfig
from sagemaker.core.shapes import ProductionVariant

endpoint_config_name = 'churn-prediction-endpoint-config'  # Name of endpoint configuration
model_name = customer_churn_model.get_name()  # Get name of SageMaker model created in previous step
endpoint_name = "customer-churn-endpoint"  # Name of SageMaker endpoint

endpoint_config = EndpointConfig.create(
    endpoint_config_name=endpoint_config_name,
    production_variants=[
        ProductionVariant(
            variant_name='AllTraffic',
            model_name=model_name,
            instance_type=instance_type,
            initial_instance_count=1
        )
    ]
)

sagemaker_endpoint = Endpoint.create(
    endpoint_name=endpoint_name,
    endpoint_config_name=endpoint_config.get_name(),
)
sagemaker_endpoint.wait_for_status(target_status='InService')  # Wait for endpoint to become in service

Test live endpoint - with one sample#

# Extract one sample payload and convert to string
sample = test_data.sample(1)
sample_payload = sample.to_csv(header=False, index=False).strip()

# Send sample payload to live endpoint and parse response
res = sagemaker_endpoint.invoke(body=sample_payload, content_type="text/csv")
result = res['Body'].read().decode('utf-8')
result

Test live endpoint - with entire test dataset#

# Convert entire test dataset to CSV string
sample_payload = test_data.to_csv(header=False, index=False).strip()

# Send sample payload to live endpoint and parse response
res = sagemaker_endpoint.invoke(body=sample_payload, content_type="text/csv")
result = res['Body'].read().decode('utf-8')
result = result.split('\n')[:-1]

# Compute performance metrics
df_result = pd.DataFrame(result).astype(float)
print_performance_metrics(df_result, test_target_column)

Clean up#

sagemaker_endpoint.delete()
endpoint_config.delete()
customer_churn_model.delete()