EMR Serverless Step in SageMaker Pipelines#

This notebook demonstrates how to use the EMRServerlessStep to run Spark jobs on EMR Serverless within a SageMaker Pipeline.

Prerequisites#

  • An AWS account with EMR Serverless access

What This Notebook Does#

  1. Creates an IAM role for EMR Serverless (if it doesn’t exist)

  2. Creates a sample PySpark script

  3. Copies sample data to your S3 bucket

  4. Creates an EMR Serverless step that provisions a new application

  5. Creates and executes a SageMaker Pipeline

from sagemaker.mlops.workflow.emr_serverless_step import (
    EMRServerlessStep,
    EMRServerlessJobConfig,
)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.core.workflow.parameters import ParameterString
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.helper.session_helper import Session, get_execution_role
# Create the SageMaker Session
sagemaker_session = Session()
pipeline_session = PipelineSession()
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()

print(f"Region: {region}")
print(f"Account ID: {account_id}")
# Define variables and parameters needed for the Pipeline steps
role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
s3_prefix = "v3-emr-serverless-pipeline"

# Pipeline parameters
emr_execution_role = ParameterString(
    name="EMRServerlessExecutionRole",
    default_value=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole"
)

spark_script_uri = ParameterString(
    name="SparkScriptUri",
    default_value=f"s3://{default_bucket}/{s3_prefix}/scripts/spark_job.py"
)

print(f"Role: {role}")
print(f"Default Bucket: {default_bucket}")

Create IAM Role for EMR Serverless#

The EMR Serverless job needs an execution role with permissions to access S3 and CloudWatch Logs.

import boto3
import json

# Create IAM role for EMR Serverless (if it doesn't exist)
iam_client = boto3.client('iam')

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "emr-serverless.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

try:
    iam_client.create_role(
        RoleName="EMRServerlessExecutionRole",
        AssumeRolePolicyDocument=json.dumps(trust_policy),
        Description="Execution role for EMR Serverless"
    )
    print("Role created!")
except iam_client.exceptions.EntityAlreadyExistsException:
    print("Role already exists")

# Attach required policies
for policy_arn in [
    "arn:aws:iam::aws:policy/AmazonS3FullAccess",
    "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
]:
    iam_client.attach_role_policy(
        RoleName="EMRServerlessExecutionRole",
        PolicyArn=policy_arn
    )

print("EMRServerlessExecutionRole is ready!")
print(f"Role ARN: arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole")
!mkdir -p code
%%writefile code/spark_job.py

"""Sample PySpark job for EMR Serverless."""
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", type=str, required=True, help="Input S3 path")
    parser.add_argument("--output", type=str, required=True, help="Output S3 path")
    args = parser.parse_args()

    # Create Spark session
    spark = SparkSession.builder.appName("EMRServerlessExample").getOrCreate()

    print(f"Reading data from: {args.input}")
    
    # Define schema for abalone dataset (no headers in CSV)
    schema = StructType([
        StructField("sex", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("diameter", DoubleType(), True),
        StructField("height", DoubleType(), True),
        StructField("whole_weight", DoubleType(), True),
        StructField("shucked_weight", DoubleType(), True),
        StructField("viscera_weight", DoubleType(), True),
        StructField("shell_weight", DoubleType(), True),
        StructField("rings", DoubleType(), True),
    ])
    
    # Read input data (no header in abalone dataset)
    df = spark.read.csv(args.input, header=False, schema=schema)
    
    # Simple transformation - show schema and count
    print("Schema:")
    df.printSchema()
    print(f"Row count: {df.count()}")
    
    # Show sample data
    print("Sample data:")
    df.show(5)
    
    # Example transformation - compute statistics
    result_df = df.describe()
    
    # Write output
    print(f"Writing results to: {args.output}")
    result_df.write.mode("overwrite").parquet(args.output)
    
    print("Job completed successfully!")
    spark.stop()


if __name__ == "__main__":
    main()
import boto3

# Upload the Spark script to S3
s3_client = boto3.client("s3")
script_s3_key = f"{s3_prefix}/scripts/spark_job.py"

s3_client.upload_file(
    "code/spark_job.py",
    default_bucket,
    script_s3_key
)

script_s3_uri = f"s3://{default_bucket}/{script_s3_key}"
print(f"Spark script uploaded to: {script_s3_uri}")

Copy Sample Data to Your Bucket#

We copy the sample data from AWS public bucket to your bucket to ensure it’s in the same region as EMR Serverless.

import boto3

# Copy sample data to your bucket (same region as EMR Serverless)
s3_resource = boto3.resource('s3')
copy_source = {
    'Bucket': 'sagemaker-sample-files',
    'Key': 'datasets/tabular/uci_abalone/abalone.csv'
}

dest_key = f"{s3_prefix}/input/abalone.csv"
s3_resource.meta.client.copy(copy_source, default_bucket, dest_key)

input_data_uri = f"s3://{default_bucket}/{dest_key}"
print(f"Sample data copied to: {input_data_uri}")

Create EMR Serverless Step#

The EMRServerlessStep supports two modes:

  1. Existing Application: Use an existing EMR Serverless application ID

  2. New Application: Create a new EMR Serverless application as part of the step

This notebook uses Option 2 (New Application) so it works out of the box.

# Define the EMR Serverless job configuration
job_config = EMRServerlessJobConfig(
    job_driver={
        "sparkSubmit": {
            "entryPoint": script_s3_uri,
            "entryPointArguments": [
                "--input", input_data_uri,
                "--output", f"s3://{default_bucket}/{s3_prefix}/output/"
            ],
        }
    },
    execution_role_arn=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole",
    configuration_overrides={
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": f"s3://{default_bucket}/{s3_prefix}/logs/"
            }
        }
    }
)

print("EMR Serverless Job Configuration created")

Option 1: Use Existing EMR Serverless Application (Optional)#

If you have an existing EMR Serverless application, you can use it instead. Uncomment the code below and replace your-application-id with your actual application ID.

# Option 1: Use an existing EMR Serverless application
# Uncomment below if you have an existing application

# step_emr_serverless_existing = EMRServerlessStep(
#     name="EMRServerlessSparkJob",
#     display_name="EMR Serverless Spark Job",
#     description="Run a PySpark job on EMR Serverless",
#     job_config=job_config,
#     application_id="your-application-id",  # Replace with your application ID
# )

print("Option 1 skipped - Using Option 2 (new application) below")

Option 2: Create New EMR Serverless Application (Default)#

This option creates a new EMR Serverless application as part of the pipeline step. The application will auto-start when needed and auto-stop after 15 minutes of idle time.

# Option 2: Create a new EMR Serverless application as part of the step
# This is the default option that works out of the box

step_emr_serverless = EMRServerlessStep(
    name="EMRServerlessSparkJob",
    display_name="EMR Serverless Spark Job",
    description="Run a PySpark job with a newly created EMR Serverless application",
    job_config=job_config,
    application_config={
        "name": "sagemaker-pipeline-spark-app",
        "releaseLabel": "emr-6.15.0",
        "type": "SPARK",
        "autoStartConfiguration": {
            "enabled": True
        },
        "autoStopConfiguration": {
            "enabled": True,
            "idleTimeoutMinutes": 15
        },
    },
)

print(f"Step Name: {step_emr_serverless.name}")
print(f"Step Type: {step_emr_serverless.step_type}")
# Create the pipeline

pipeline = Pipeline(
    name="EMRServerlessPipeline",
    parameters=[
        emr_execution_role,
        spark_script_uri,
    ],
    steps=[step_emr_serverless],
    sagemaker_session=pipeline_session,
)

print("Pipeline created successfully!")
import json

definition = json.loads(pipeline.definition())
print(json.dumps(definition, indent=2))

Execute Pipeline#

The cells below will:

  1. Create/update the pipeline in SageMaker

  2. Start the pipeline execution

  3. Wait for completion

# Create/update the pipeline
pipeline.upsert(role_arn=role)
print("Pipeline upserted successfully!")
# Start the pipeline execution
execution = pipeline.start()
print(f"Pipeline execution started: {execution.arn}")
import time

# Wait for pipeline execution to complete
while True:
    status = execution.describe()['PipelineExecutionStatus']
    print(f"Status: {status}")
    
    if status in ['Succeeded', 'Failed', 'Stopped']:
        print(f"Pipeline finished with status: {status}")
        break
    
    print("Still running... waiting 30 seconds")
    time.sleep(30)
# Check step execution details
steps = execution.list_steps()
for step in steps:
    print(f"Step: {step['StepName']}")
    print(f"  Status: {step['StepStatus']}")
    if 'FailureReason' in step:
        print(f"  Failure Reason: {step['FailureReason']}")
    print()

Cleanup (Optional)#

Uncomment the cell below to delete the pipeline when you’re done.

# Uncomment to delete the pipeline
sm_client = sagemaker_session.sagemaker_client
sm_client.delete_pipeline(PipelineName="EMRServerlessPipeline")
print("Pipeline deleted")