Data Preprocessing with Scikit-learn Processing Job#
Run a Scikit-learn processing job to preprocess the Abalone dataset into train, validation, and test splits.
from sagemaker.core.processing import (
ScriptProcessor,
)
from sagemaker.core.shapes import (
ProcessingInput,
ProcessingS3Input,
ProcessingOutput,
ProcessingS3Output
)
from sagemaker.mlops.workflow.steps import CacheConfig
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.core import image_uris
sagemaker_session = Session()
sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
prefix = "pipeline-v3"
account_id = sagemaker_session.account_id()
# Define variables and parameters needed for the Pipeline steps
role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "v3-job"
s3_prefix = "v3-test-pipeline"
default_bucket_prefix = sagemaker_session.default_bucket_prefix
# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
s3_prefix = f"{default_bucket_prefix}/{s3_prefix}"
base_job_prefix = f"{default_bucket_prefix}/{base_job_prefix}"
processing_instance_count = 1
training_instance_type = "ml.m5.xlarge"
model_approval_status = "PendingManualApproval"
input_data = f"s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv"
model_approval_status = "PendingManualApproval"
# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
!mkdir -p code
%%writefile code/preprocess.py
"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile
import boto3
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
"sex",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
]
label_column = "rings"
feature_columns_dtype = {
"sex": str,
"length": np.float64,
"diameter": np.float64,
"height": np.float64,
"whole_weight": np.float64,
"shucked_weight": np.float64,
"viscera_weight": np.float64,
"shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}
def merge_two_dicts(x, y):
"""Merges two dicts, returning a new copy."""
z = x.copy()
z.update(y)
return z
if __name__ == "__main__":
logger.debug("Starting preprocessing.")
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, required=True)
args = parser.parse_args()
base_dir = "/opt/ml/processing"
pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
input_data = args.input_data
bucket = input_data.split("/")[2]
key = "/".join(input_data.split("/")[3:])
logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
fn = f"{base_dir}/data/abalone-dataset.csv"
s3 = boto3.resource("s3")
s3.Bucket(bucket).download_file(key, fn)
logger.debug("Reading downloaded data.")
df = pd.read_csv(
fn,
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
)
os.unlink(fn)
logger.debug("Defining transformers.")
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_features = ["sex"]
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
logger.info("Applying transforms.")
y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
np.random.shuffle(X)
train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])
logger.info("Writing out datasets to %s.", base_dir)
pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
pd.DataFrame(validation).to_csv(
f"{base_dir}/validation/validation.csv", header=False, index=False
)
pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
# Process the training data step using a python script.
# Split the training data set into train, test, and validation datasets
# Using modern ScriptProcessor class instead of SKLearnProcessor
sklearn_processor = ScriptProcessor(
image_uri=image_uris.retrieve(
framework="sklearn",
region=region,
version="1.2-1",
py_version="py3",
instance_type="ml.m5.xlarge",
),
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}-sklearn-preprocess-job",
sagemaker_session=sagemaker_session,
role=role,
)
processor_args = sklearn_processor.run(
wait=False,
inputs=[
ProcessingInput(
input_name="input-1",
s3_input=ProcessingS3Input(
s3_uri=input_data,
local_path="/opt/ml/processing/input",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="ShardedByS3Key",
)
)
],
outputs=[
ProcessingOutput(
output_name="train",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/train",
local_path="/opt/ml/processing/train",
s3_upload_mode="EndOfJob"
)
),
ProcessingOutput(
output_name="validation",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/validation",
local_path="/opt/ml/processing/validation",
s3_upload_mode="EndOfJob"
)
),
ProcessingOutput(
output_name="test",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/test",
local_path="/opt/ml/processing/test",
s3_upload_mode="EndOfJob"
)
),
],
code="code/preprocess.py",
arguments=["--input-data", input_data],
)
sklearn_processor.latest_job.refresh()
sklearn_processor.latest_job.processing_job_status