EMR Serverless Step in SageMaker Pipelines#
This notebook demonstrates how to use the EMRServerlessStep to run Spark jobs on EMR Serverless within a SageMaker Pipeline.
Prerequisites#
An AWS account with EMR Serverless access
What This Notebook Does#
Creates an IAM role for EMR Serverless (if it doesn’t exist)
Creates a sample PySpark script
Copies sample data to your S3 bucket
Creates an EMR Serverless step that provisions a new application
Creates and executes a SageMaker Pipeline
from sagemaker.mlops.workflow.emr_serverless_step import (
EMRServerlessStep,
EMRServerlessJobConfig,
)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.core.workflow.parameters import ParameterString
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.helper.session_helper import Session, get_execution_role
# Create the SageMaker Session
sagemaker_session = Session()
pipeline_session = PipelineSession()
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()
print(f"Region: {region}")
print(f"Account ID: {account_id}")
# Define variables and parameters needed for the Pipeline steps
role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
s3_prefix = "v3-emr-serverless-pipeline"
# Pipeline parameters
emr_execution_role = ParameterString(
name="EMRServerlessExecutionRole",
default_value=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole"
)
spark_script_uri = ParameterString(
name="SparkScriptUri",
default_value=f"s3://{default_bucket}/{s3_prefix}/scripts/spark_job.py"
)
print(f"Role: {role}")
print(f"Default Bucket: {default_bucket}")
Create IAM Role for EMR Serverless#
The EMR Serverless job needs an execution role with permissions to access S3 and CloudWatch Logs.
import boto3
import json
# Create IAM role for EMR Serverless (if it doesn't exist)
iam_client = boto3.client('iam')
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "emr-serverless.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
iam_client.create_role(
RoleName="EMRServerlessExecutionRole",
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description="Execution role for EMR Serverless"
)
print("Role created!")
except iam_client.exceptions.EntityAlreadyExistsException:
print("Role already exists")
# Attach required policies
for policy_arn in [
"arn:aws:iam::aws:policy/AmazonS3FullAccess",
"arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
]:
iam_client.attach_role_policy(
RoleName="EMRServerlessExecutionRole",
PolicyArn=policy_arn
)
print("EMRServerlessExecutionRole is ready!")
print(f"Role ARN: arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole")
!mkdir -p code
%%writefile code/spark_job.py
"""Sample PySpark job for EMR Serverless."""
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str, required=True, help="Input S3 path")
parser.add_argument("--output", type=str, required=True, help="Output S3 path")
args = parser.parse_args()
# Create Spark session
spark = SparkSession.builder.appName("EMRServerlessExample").getOrCreate()
print(f"Reading data from: {args.input}")
# Define schema for abalone dataset (no headers in CSV)
schema = StructType([
StructField("sex", StringType(), True),
StructField("length", DoubleType(), True),
StructField("diameter", DoubleType(), True),
StructField("height", DoubleType(), True),
StructField("whole_weight", DoubleType(), True),
StructField("shucked_weight", DoubleType(), True),
StructField("viscera_weight", DoubleType(), True),
StructField("shell_weight", DoubleType(), True),
StructField("rings", DoubleType(), True),
])
# Read input data (no header in abalone dataset)
df = spark.read.csv(args.input, header=False, schema=schema)
# Simple transformation - show schema and count
print("Schema:")
df.printSchema()
print(f"Row count: {df.count()}")
# Show sample data
print("Sample data:")
df.show(5)
# Example transformation - compute statistics
result_df = df.describe()
# Write output
print(f"Writing results to: {args.output}")
result_df.write.mode("overwrite").parquet(args.output)
print("Job completed successfully!")
spark.stop()
if __name__ == "__main__":
main()
import boto3
# Upload the Spark script to S3
s3_client = boto3.client("s3")
script_s3_key = f"{s3_prefix}/scripts/spark_job.py"
s3_client.upload_file(
"code/spark_job.py",
default_bucket,
script_s3_key
)
script_s3_uri = f"s3://{default_bucket}/{script_s3_key}"
print(f"Spark script uploaded to: {script_s3_uri}")
Copy Sample Data to Your Bucket#
We copy the sample data from AWS public bucket to your bucket to ensure it’s in the same region as EMR Serverless.
import boto3
# Copy sample data to your bucket (same region as EMR Serverless)
s3_resource = boto3.resource('s3')
copy_source = {
'Bucket': 'sagemaker-sample-files',
'Key': 'datasets/tabular/uci_abalone/abalone.csv'
}
dest_key = f"{s3_prefix}/input/abalone.csv"
s3_resource.meta.client.copy(copy_source, default_bucket, dest_key)
input_data_uri = f"s3://{default_bucket}/{dest_key}"
print(f"Sample data copied to: {input_data_uri}")
Create EMR Serverless Step#
The EMRServerlessStep supports two modes:
Existing Application: Use an existing EMR Serverless application ID
New Application: Create a new EMR Serverless application as part of the step
This notebook uses Option 2 (New Application) so it works out of the box.
# Define the EMR Serverless job configuration
job_config = EMRServerlessJobConfig(
job_driver={
"sparkSubmit": {
"entryPoint": script_s3_uri,
"entryPointArguments": [
"--input", input_data_uri,
"--output", f"s3://{default_bucket}/{s3_prefix}/output/"
],
}
},
execution_role_arn=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole",
configuration_overrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": f"s3://{default_bucket}/{s3_prefix}/logs/"
}
}
}
)
print("EMR Serverless Job Configuration created")
Option 1: Use Existing EMR Serverless Application (Optional)#
If you have an existing EMR Serverless application, you can use it instead. Uncomment the code below and replace your-application-id with your actual application ID.
# Option 1: Use an existing EMR Serverless application
# Uncomment below if you have an existing application
# step_emr_serverless_existing = EMRServerlessStep(
# name="EMRServerlessSparkJob",
# display_name="EMR Serverless Spark Job",
# description="Run a PySpark job on EMR Serverless",
# job_config=job_config,
# application_id="your-application-id", # Replace with your application ID
# )
print("Option 1 skipped - Using Option 2 (new application) below")
Option 2: Create New EMR Serverless Application (Default)#
This option creates a new EMR Serverless application as part of the pipeline step. The application will auto-start when needed and auto-stop after 15 minutes of idle time.
# Option 2: Create a new EMR Serverless application as part of the step
# This is the default option that works out of the box
step_emr_serverless = EMRServerlessStep(
name="EMRServerlessSparkJob",
display_name="EMR Serverless Spark Job",
description="Run a PySpark job with a newly created EMR Serverless application",
job_config=job_config,
application_config={
"name": "sagemaker-pipeline-spark-app",
"releaseLabel": "emr-6.15.0",
"type": "SPARK",
"autoStartConfiguration": {
"enabled": True
},
"autoStopConfiguration": {
"enabled": True,
"idleTimeoutMinutes": 15
},
},
)
print(f"Step Name: {step_emr_serverless.name}")
print(f"Step Type: {step_emr_serverless.step_type}")
# Create the pipeline
pipeline = Pipeline(
name="EMRServerlessPipeline",
parameters=[
emr_execution_role,
spark_script_uri,
],
steps=[step_emr_serverless],
sagemaker_session=pipeline_session,
)
print("Pipeline created successfully!")
import json
definition = json.loads(pipeline.definition())
print(json.dumps(definition, indent=2))
Execute Pipeline#
The cells below will:
Create/update the pipeline in SageMaker
Start the pipeline execution
Wait for completion
# Create/update the pipeline
pipeline.upsert(role_arn=role)
print("Pipeline upserted successfully!")
# Start the pipeline execution
execution = pipeline.start()
print(f"Pipeline execution started: {execution.arn}")
import time
# Wait for pipeline execution to complete
while True:
status = execution.describe()['PipelineExecutionStatus']
print(f"Status: {status}")
if status in ['Succeeded', 'Failed', 'Stopped']:
print(f"Pipeline finished with status: {status}")
break
print("Still running... waiting 30 seconds")
time.sleep(30)
# Check step execution details
steps = execution.list_steps()
for step in steps:
print(f"Step: {step['StepName']}")
print(f" Status: {step['StepStatus']}")
if 'FailureReason' in step:
print(f" Failure Reason: {step['FailureReason']}")
print()
Cleanup (Optional)#
Uncomment the cell below to delete the pipeline when you’re done.
# Uncomment to delete the pipeline
sm_client = sagemaker_session.sagemaker_client
sm_client.delete_pipeline(PipelineName="EMRServerlessPipeline")
print("Pipeline deleted")