sagemaker.mlops.workflow

Contents

sagemaker.mlops.workflow#

SageMaker workflow orchestration module.

This module provides pipeline and step orchestration capabilities for SageMaker workflows. It contains the high-level classes that orchestrate training, processing, and serving components from the train and serve packages.

Key components: - Pipeline: Main workflow orchestration class - Steps: Various step implementations (TrainingStep, ProcessingStep, etc.) - Configuration: Pipeline configuration classes - Utilities: Helper functions for workflow management

Note: This module imports from sagemaker.core.workflow for primitives (entities, parameters, functions, conditions, properties) and can import from sagemaker.train and sagemaker.serve for orchestration purposes.

class sagemaker.mlops.workflow.AutoMLStep(name: str, step_args: _JobStepArguments, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

AutoMLStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_auto_ml_job.

NOTE: The CreateAutoMLJob request is not quite the

args list that workflow needs.

ModelDeployConfig and GenerateCandidateDefinitionsOnly

attribute cannot be included.

get_best_auto_ml_model_builder(role, sagemaker_session=None)[source]#

Get the best candidate model artifacts, image uri and env variables for the best model.

Parameters:
  • role (str) – An AWS IAM role (either name or full ARN). The Amazon SageMaker AutoML jobs and APIs that create Amazon SageMaker endpoints use this role to access training data and model artifacts.

  • sagemaker_session (sagemaker.core.helper.session.Session) –

    A SageMaker Session object, used for SageMaker interactions. If the best model will be used as part of ModelStep, then sagemaker_session should be class:~sagemaker.workflow.pipeline_context.PipelineSession. Example:

    model = Model(sagemaker_session=PipelineSession())
    model_step = ModelStep(step_args=model.register())
    

property properties#

A Properties object representing the DescribeAutoMLJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.CacheConfig(enable_caching: bool = False, expire_after=None)[source]#

Bases: object

Configuration class to enable caching in SageMaker Pipelines Workflows.

If caching is enabled, the pipeline attempts to find a previous execution of a Step that was called with the same arguments. Step caching only considers successful execution. If a successful previous execution is found, the pipeline propagates the values from the previous execution rather than recomputing the Step. When multiple successful executions exist within the timeout period, it uses the result for the most recent successful execution.

enable_caching#

To enable Step caching. Defaults to False.

Type:

bool

expire_after#

If Step caching is enabled, a timeout also needs to defined. It defines how old a previous execution can be to be considered for reuse. Value should be an ISO 8601 duration string. Defaults to None.

Examples:

'p30d' # 30 days
'P4DT12H' # 4 days and 12 hours
'T12H' # 12 hours
Type:

str

property config#

Configures Step caching for SageMaker Pipelines Workflows.

enable_caching: bool#
class sagemaker.mlops.workflow.CallbackOutput(output_name: str | None = None, output_type: CallbackOutputTypeEnum = CallbackOutputTypeEnum.String)[source]#

Bases: object

Output for a callback step.

output_name#

The output name

Type:

str

output_type#

The output type

Type:

CallbackOutputTypeEnum

expr(step_name) Dict[str, str][source]#

The ‘Get’ expression dict for a CallbackOutput.

output_name: str#
output_type: CallbackOutputTypeEnum#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.CallbackStep(name: str, sqs_queue_url: str, inputs: dict, outputs: List[CallbackOutput], display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

Callback step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the callback step.

property properties#

A Properties object representing the output parameters of the callback step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.CheckJobConfig(role, instance_count=1, instance_type='ml.m5.xlarge', volume_size_in_gb=30, volume_kms_key=None, output_kms_key=None, max_runtime_in_seconds=None, base_job_name=None, sagemaker_session=None, env=None, tags=None, network_config=None)[source]#

Bases: object

Check job config for QualityCheckStep and ClarifyCheckStep.

class sagemaker.mlops.workflow.ClarifyCheckStep(name: str, clarify_check_config: ClarifyCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

ClarifyCheckStep step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the ClarifyCheck step.

property properties#

A Properties object representing the output parameters of the ClarifyCheck step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration etc.

class sagemaker.mlops.workflow.ConditionStep(name: str, depends_on: List[str | Step] | None = None, display_name: str | None = None, description: str | None = None, conditions: List[Condition] | None = None, if_steps: List[Step] | None = None, else_steps: List[Step] | None = None)[source]#

Bases: Step

Conditional step for pipelines to support conditional branching in the execution of steps.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the conditional branching in the pipeline.

property properties#

A simple Properties object with Outcome as the only property

property step_only_arguments#

Argument dict pertaining to the step only, and not the if_steps or else_steps.

class sagemaker.mlops.workflow.ConfigurableRetryStep(name: str, step_type: StepTypeEnum, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: Step

ConfigurableRetryStep for SageMaker Pipelines Workflows.

add_retry_policy(retry_policy: RetryPolicy)[source]#

Add a policy to the current ConfigurableRetryStep retry policies list.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for ConfigurableRetryStep.

class sagemaker.mlops.workflow.EMRStep(name: str, display_name: str, description: str, cluster_id: str, step_config: EMRStepConfig, depends_on: List[str | Step] | None = None, cache_config: CacheConfig | None = None, cluster_config: Dict[str, Any] | None = None, execution_role_arn: str | None = None)[source]#

Bases: Step

EMR step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to call AddJobFlowSteps.

NOTE: The AddFlowJobSteps request is not quite the args list that workflow needs. The Name attribute in AddJobFlowSteps cannot be passed; it will be set during runtime. In addition to that, we will also need to include emr job inputs and output config.

property properties: Dict[str, Any] | List[Dict[str, Any]]#

A Properties object representing the EMR DescribeStepResponse model

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.EMRStepConfig(jar, args: List[str] | None = None, main_class: str | None = None, properties: List[dict] | None = None, output_args: dict[str, str] | None = None)[source]#

Bases: object

Config for a Hadoop Jar step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Convert EMRStepConfig object to request dict.

class sagemaker.mlops.workflow.FailStep(name: str, error_message: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

FailStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to define the FailStep.

property properties#

A Properties object is not available for the FailStep.

Executing a FailStep will terminate the pipeline. FailStep properties should not be referenced.

class sagemaker.mlops.workflow.LambdaOutput(output_name: str | None = None, output_type: LambdaOutputTypeEnum = LambdaOutputTypeEnum.String)[source]#

Bases: object

Output for a lambdaback step.

output_name#

The output name

Type:

str

output_type#

The output type

Type:

LambdaOutputTypeEnum

expr(step_name) Dict[str, str][source]#

The ‘Get’ expression dict for a LambdaOutput.

output_name: str#
output_type: LambdaOutputTypeEnum#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.LambdaStep(name: str, lambda_func: Lambda, display_name: str | None = None, description: str | None = None, inputs: dict | None = None, outputs: List[LambdaOutput] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step | StepCollection] | None = None)[source]#

Bases: Step

Lambda step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the lambda step.

property properties#

A Properties object representing the output parameters of the lambda step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.ModelStep(name: str, step_args: _ModelStepArguments | Dict, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | Dict[str, List[RetryPolicy]] | None = None, display_name: str | None = None, description: str | None = None, repack_model_step_settings: Dict[str, any] | None = None)[source]#

Bases: ConfigurableRetryStep

ModelStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that are used to call the appropriate SageMaker API.

property properties#

A Properties object representing the appropriate SageMaker response data model.

class sagemaker.mlops.workflow.MonitorBatchTransformStep(name: str, transform_step_args: _JobStepArguments, monitor_configuration: QualityCheckConfig | ClarifyCheckConfig, check_job_configuration: CheckJobConfig, monitor_before_transform: bool = False, fail_on_violation: bool | PipelineVariable = True, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None)[source]#

Bases: StepCollection

Creates a Transformer step with Quality or Clarify check step

Used to monitor the inputs and outputs of the batch transform job.

class sagemaker.mlops.workflow.NotebookJobStep(input_notebook: str, image_uri: str, kernel_name: str, name: str | None = None, display_name: str | None = None, description: str | None = None, notebook_job_name: str | None = None, role: str | None = None, s3_root_uri: str | None = None, parameters: Dict[str, str | PipelineVariable] | None = None, environment_variables: Dict[str, str | PipelineVariable] | None = None, initialization_script: str | None = None, s3_kms_key: str | PipelineVariable | None = None, instance_type: str | PipelineVariable | None = 'ml.m5.large', volume_size: int | PipelineVariable = 30, volume_kms_key: str | PipelineVariable | None = None, encrypt_inter_container_traffic: bool | PipelineVariable | None = True, security_group_ids: List[str | PipelineVariable] | None = None, subnets: List[str | PipelineVariable] | None = None, max_retry_attempts: int = 1, max_runtime_in_seconds: int = 172800, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, additional_dependencies: List[str] | None = None, retry_policies: List[RetryPolicy] | None = None, depends_on: List[Step | StepOutput] | None = None)[source]#

Bases: ConfigurableRetryStep

NotebookJobStep for SageMaker Pipelines Workflows.

For more details about SageMaker Notebook Jobs, see SageMaker Notebook Jobs.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

Generates the arguments dictionary that is used to create the job.

property depends_on: List[str | Step | StepCollection | StepOutput] | None#

The list of steps the current Step depends on.

property properties#

A Properties object representing the notebook job step output

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for workflow service calls.

class sagemaker.mlops.workflow.ParallelismConfiguration(max_parallel_execution_steps: int)[source]#

Bases: object

Parallelism config for SageMaker pipeline.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Returns: the request structure.

class sagemaker.mlops.workflow.Pipeline(name: str = '', parameters: ~typing.Sequence[~sagemaker.core.workflow.parameters.Parameter] | None = None, pipeline_experiment_config: ~sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig | None = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig object>, mlflow_config: ~sagemaker.core.shapes.shapes.MlflowConfig | None = None, steps: ~typing.Sequence[~sagemaker.mlops.workflow.steps.Step | ~sagemaker.core.workflow.step_outputs.StepOutput] | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, pipeline_definition_config: ~sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig | None = <sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig object>)[source]#

Bases: object

Pipeline for workflow.

build_parameters_from_execution(pipeline_execution_arn: str, parameter_value_overrides: Dict[str, str | bool | int | float] | None = None) Dict[str, str | bool | int | float][source]#

Gets the parameters from an execution, update with optional parameter value overrides.

Parameters:
  • pipeline_execution_arn (str) – The arn of the reference pipeline execution.

  • parameter_value_overrides (Dict[str, Union[str, bool, int, float]]) – Parameter dict to be updated with the parameters from the referenced execution.

Returns:

A parameter dict built from an execution and provided parameter value overrides.

create(role_arn: str = None, description: str = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration = None) Dict[str, Any][source]#

Creates a Pipeline in the Pipelines service.

Parameters:
  • role_arn (str) – The role arn that is assumed by the pipeline to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed to the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

definition() str[source]#

Converts a request structure to string representation for workflow service calls.

Returns:

A JSON formatted string of pipeline definition.

delete() Dict[str, Any][source]#

Deletes a Pipeline in the Workflow service.

Returns:

A response dict from the service.

delete_triggers(trigger_names: List[str])[source]#

Delete Triggers for a parent SageMaker Pipeline if they exist.

Parameters:

trigger_names (List[str]) – List of trigger names to be deleted. Currently, these can only be EventBridge schedule names.

describe(pipeline_version_id: int | None = None) Dict[str, Any][source]#

Describes a Pipeline in the Workflow service.

Parameters:

pipeline_version_id (Optional[str]) – version ID of the pipeline to describe.

Returns:

Response dict from the service. See boto3 client documentation

describe_trigger(trigger_name: str) Dict[str, Any][source]#

Describe Trigger for a parent SageMaker Pipeline.

Parameters:

trigger_name (str) – Trigger name to be described. Currently, this can only be an EventBridge schedule name.

Returns:

Trigger describe responses from EventBridge.

Return type:

Dict[str, str]

property latest_pipeline_version_id#

Retrieves the latest version id of this pipeline

list_executions(sort_by: str | None = None, sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) Dict[str, Any][source]#

Lists a pipeline’s executions.

Parameters:
  • sort_by (str) – The field by which to sort results(CreationTime/PipelineExecutionArn).

  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Execution Summaries. See boto3 client list_pipeline_executions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_pipeline_executions

list_pipeline_versions(sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) str[source]#

Lists a pipeline’s versions.

Parameters:
  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Version Summaries. See boto3 client list_pipeline_versions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#

put_triggers(triggers: List[Trigger], role_arn: str | None = None) List[str][source]#

Attach triggers to a parent SageMaker Pipeline.

Parameters:
  • triggers (List[Trigger]) – List of supported triggers. Currently, this can only be of type PipelineSchedule.

  • role_arn (str) – The role arn that is assumed by EventBridge service.

Returns:

Successfully created trigger Arn(s). Currently, the pythonSDK only supports

PipelineSchedule triggers, thus, this is a list of EventBridge Schedule Arn(s) that were created/upserted.

Return type:

List[str]

start(parameters: Dict[str, str | bool | int | float] = None, execution_display_name: str = None, execution_description: str = None, parallelism_config: ParallelismConfiguration = None, selective_execution_config: SelectiveExecutionConfig = None, mlflow_experiment_name: str = None, pipeline_version_id: int = None)[source]#

Starts a Pipeline execution in the Workflow service.

Parameters:
  • parameters (Dict[str, Union[str, bool, int, float]]) – values to override pipeline parameters.

  • execution_display_name (str) – The display name of the pipeline execution.

  • execution_description (str) – A description of the execution.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

  • selective_execution_config (Optional[SelectiveExecutionConfig]) – The configuration for selective step execution.

  • mlflow_experiment_name (str) – Optional MLflow experiment name to override the experiment name specified in the pipeline’s mlflow_config. If provided, this will override the experiment name for this specific pipeline execution only, without modifying the pipeline definition.

  • pipeline_version_id (Optional[str]) – version ID of the pipeline to start the execution from. If not specified, uses the latest version ID.

Returns:

A _PipelineExecution instance, if successful.

update(role_arn: str | None = None, description: str | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Updates a Pipeline in the Workflow service.

Parameters:
  • role_arn (str) – The role arn that is assumed by pipelines to create step artifacts.

  • description (str) – A description of the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

upsert(role_arn: str | None = None, description: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Creates a pipeline or updates it, if it already exists.

Parameters:
  • role_arn (str) – The role arn that is assumed by workflow to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed.

  • steps (parallelism_config (Optional[Config for parallel) – is applied to each of the executions

  • that (Parallelism configuration) – is applied to each of the executions

Returns:

response dict from service

class sagemaker.mlops.workflow.PipelineExperimentConfig(experiment_name: str | Parameter | ExecutionVariable | PipelineVariable, trial_name: str | Parameter | ExecutionVariable | PipelineVariable)[source]#

Bases: Entity

Experiment config for SageMaker pipeline.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Returns: the request structure.

class sagemaker.mlops.workflow.PipelineExperimentConfigProperties[source]#

Bases: object

Enum-like class for all pipeline experiment config property references.

EXPERIMENT_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
TRIAL_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
class sagemaker.mlops.workflow.PipelineExperimentConfigProperty(name: str)[source]#

Bases: PipelineVariable

Reference to pipeline experiment config property.

property expr: Dict[str, Any] | List[Dict[str, Any]]#

The ‘Get’ expression dict for a pipeline experiment config property.

class sagemaker.mlops.workflow.PipelineGraph(steps: Sequence[Step])[source]#

Bases: object

Helper class representing the Pipeline Directed Acyclic Graph (DAG)

steps#

Sequence of `Step`s that represent each node in the pipeline DAG

Type:

Sequence[Step]

classmethod from_pipeline(pipeline: Pipeline)[source]#

Create a PipelineGraph object from the Pipeline object.

get_steps_in_sub_dag(current_step: Step, sub_dag_steps: Set[str] | None = None) Set[str][source]#

Get names of all steps (including current step) in the sub dag of current step.

Returns a set of step names in the sub dag.

is_cyclic() bool[source]#

Check if this pipeline graph is cyclic.

Returns true if it is cyclic, false otherwise.

class sagemaker.mlops.workflow.PipelineSchedule(name: str | None = None, enabled: bool | None = True, start_date: datetime | None = None, at: datetime | None = None, rate: tuple | None = None, cron: str | None = None)[source]#

Bases: Trigger

Pipeline Schedule trigger type used to create EventBridge Schedules for SageMaker Pipelines.

To create a pipeline schedule, specify a single type using the at, rate, or cron parameters. For more information about EventBridge syntax, see Schedule types on EventBridge Scheduler.

Parameters:
  • start_date (datetime) – The start date of the schedule. Default is time.now().

  • at (datetime) – An “At” EventBridge expression. Defaults to UTC timezone. Note that if you use datetime.now(), the result is a snapshot of your current local time. Eventbridge requires a time in UTC format. You can convert the result of datetime.now() to UTC by using datetime.utcnow() or datetime.now(tz=pytz.utc). For example, you can create a time two minutes from now with the expression datetime.now(tz=pytz.utc) + timedelta(0, 120).

  • rate (tuple) – A “Rate” EventBridge expression. Format is (value, unit).

  • cron (str) – A “Cron” EventBridge expression. Format is “minutes hours day-of-month month day-of-week year”.

  • name (str) – The schedule name. Default is None.

  • enabled (boolean) – If the schedule is enabled. Defaults to True.

at: datetime | None#
cron: str | None#
rate: tuple | None#
resolve_schedule_expression() str[source]#

Resolve schedule expression

Format schedule expression for an EventBridge client call from the specified

at, rate, or cron parameter. After resolution, if there are any othererrors in the syntax, this will throw an expected ValidationException from EventBridge.

Returns:

Correctly string formatted schedule expression based on type.

Return type:

schedule_expression

start_date: datetime | None#
class sagemaker.mlops.workflow.ProcessingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, property_files: List[PropertyFile] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

ProcessingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_processing_job.

NOTE: The CreateProcessingJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeProcessingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.QualityCheckConfig(baseline_dataset: str | PipelineVariable, dataset_format: dict, *, output_s3_uri: str | PipelineVariable | None = None, post_analytics_processor_script: str | None = None)[source]#

Bases: ABC

Quality Check Config.

baseline_dataset#

The path to the baseline_dataset file. This can be a local path or an S3 uri string

Type:

str or PipelineVariable

dataset_format#

The format of the baseline_dataset.

Type:

dict

output_s3_uri#

Desired S3 destination of the constraint_violations and statistics json files (default: None). If not specified an auto generated path will be used: “s3://<default_session_bucket>/model-monitor/baselining/<job_name>/results”

Type:

str or PipelineVariable

post_analytics_processor_script#

The path to the record post-analytics processor script (default: None). This can be a local path or an S3 uri string but CANNOT be any type of the PipelineVariable.

Type:

str

baseline_dataset: str | PipelineVariable#
dataset_format: dict#
output_s3_uri: str | PipelineVariable#
post_analytics_processor_script: str#
class sagemaker.mlops.workflow.QualityCheckStep(name: str, quality_check_config: QualityCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

QualityCheck step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the QualityCheck step.

property properties#

A Properties object representing the output parameters of the QualityCheck step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration etc.

class sagemaker.mlops.workflow.RetryPolicy(backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: Entity

RetryPolicy base class

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

validate_backoff_rate(_, value)[source]#

Validate the input back off rate type

validate_expire_after_mins(_, value)[source]#

Validate expire after mins

validate_interval_seconds(_, value)[source]#

Validate the input interval seconds

validate_max_attempts(_, value)[source]#

Validate the input max attempts

class sagemaker.mlops.workflow.SageMakerJobExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#

Bases: Enum

SageMaker Job ExceptionType enum.

CAPACITY_ERROR = 'SageMaker.CAPACITY_ERROR'#
INTERNAL_ERROR = 'SageMaker.JOB_INTERNAL_ERROR'#
RESOURCE_LIMIT = 'SageMaker.RESOURCE_LIMIT'#
class sagemaker.mlops.workflow.SageMakerJobStepRetryPolicy(exception_types: List[SageMakerJobExceptionTypeEnum] | None = None, failure_reason_types: List[SageMakerJobExceptionTypeEnum] | None = None, backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: RetryPolicy

RetryPolicy for exception thrown by SageMaker Job.

exception_types#

The SageMaker exception to match for this policy. The SageMaker exceptions captured here are the exceptions thrown by synchronously creating the job. For instance the resource limit exception.

Type:

List[SageMakerJobExceptionTypeEnum]

failure_reason_types#

the SageMaker failure reason types to match for this policy. The failure reason type is presented in FailureReason field of the Describe response, it indicates the runtime failure reason for a job.

Type:

List[SageMakerJobExceptionTypeEnum]

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for retry policy.

class sagemaker.mlops.workflow.SelectiveExecutionConfig(selected_steps: List[str], reference_latest_execution: bool = True, source_pipeline_execution_arn: str | None = None)[source]#

Bases: object

The selective execution configuration, which defines a subset of pipeline steps to run in

another SageMaker pipeline run.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Convert SelectiveExecutionConfig object to request dict.

class sagemaker.mlops.workflow.Step(name: str, display_name: str | None = None, description: str | None = None, step_type: StepTypeEnum = None, depends_on: List[str | Step | StepCollection | StepOutput] | None = None)[source]#

Bases: Entity

Pipeline Step for SageMaker Pipelines Workflows.

add_depends_on(step_names: List[str | Step | StepCollection | StepOutput])[source]#

Add Step names or Step instances to the current Step depends on list.

abstract property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to the particular Step service call.

property depends_on: List[str | Step | StepCollection | StepOutput] | None#

The list of steps the current Step depends on.

abstract property properties#

The properties of the particular Step.

property ref: Dict[str, str]#

Gets a reference dictionary for Step instances.

property step_only_arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to this Step only.

Compound Steps such as the ConditionStep will have to override this method to return arguments pertaining to only that step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for workflow service calls.

class sagemaker.mlops.workflow.StepCollection(name: str, steps: List[Step] = NOTHING, depends_on: List[str | Step | StepCollection | StepOutput] = None)[source]#

Bases: object

A wrapper of pipeline steps for workflow.

name#

The name of the StepCollection.

Type:

str

steps#

A list of steps.

Type:

List[Step]

depends_on#

The list of Step/StepCollection names or Step/StepCollection/StepOutput instances that the current Step depends on.

Type:

List[Union[str, Step, StepCollection, StepOutput]]

depends_on: List[str | Step | StepCollection | StepOutput]#
name: str#
property properties#

The properties of the particular StepCollection.

request_dicts() List[Dict[str, Any] | List[Dict[str, Any]]][source]#

Get the request structure for workflow service calls.

steps: List[Step]#
class sagemaker.mlops.workflow.StepExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#

Bases: Enum

Step ExceptionType enum.

SERVICE_FAULT = 'Step.SERVICE_FAULT'#
THROTTLING = 'Step.THROTTLING'#
class sagemaker.mlops.workflow.StepRetryPolicy(exception_types: List[StepExceptionTypeEnum], backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: RetryPolicy

RetryPolicy for a retryable step. The pipeline service will retry

sagemaker.workflow.retry.StepRetryExceptionTypeEnum.SERVICE_FAULT and sagemaker.workflow.retry.StepRetryExceptionTypeEnum.THROTTLING regardless of pipeline step type by default. However, for step defined as retryable, you can override them by specifying a StepRetryPolicy.

exception_types#

the exception types to match for this policy

Type:

List[StepExceptionTypeEnum]

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for retry policy.

class sagemaker.mlops.workflow.StepTypeEnum(value)[source]#

Bases: Enum

Enum of Step types.

AUTOML = 'AutoML'#
CALLBACK = 'Callback'#
CLARIFY_CHECK = 'ClarifyCheck'#
CONDITION = 'Condition'#
CREATE_MODEL = 'Model'#
EMR = 'EMR'#
EMR_SERVERLESS = 'EMRServerless'#
FAIL = 'Fail'#
LAMBDA = 'Lambda'#
PROCESSING = 'Processing'#
QUALITY_CHECK = 'QualityCheck'#
REGISTER_MODEL = 'RegisterModel'#
TRAINING = 'Training'#
TRANSFORM = 'Transform'#
TUNING = 'Tuning'#
class sagemaker.mlops.workflow.TrainingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TrainingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_training_job.

NOTE: The CreateTrainingJob request is not quite the args list that workflow needs.

property properties#

A Properties object representing the DescribeTrainingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the request dictionary with cache configuration.

class sagemaker.mlops.workflow.TransformStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TransformStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_transform_job.

NOTE: The CreateTransformJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeTransformJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.Trigger(name: str | None = None, enabled: bool | None = True)[source]#

Bases: object

Abstract class representing a Pipeline Trigger

name#

The name of the trigger, default to pipeline_name.

Type:

str

enabled#

The state of the schedule, default True resolves to ‘ENABLED’.

Type:

boolean

enabled: bool | None#
name: str | None#
resolve_trigger_name(pipeline_name: str) str[source]#

Resolve the schedule name given a parent pipeline.

Parameters:

pipeline_name (str) – Parent pipeline name

Returns:

Resolved schedule name.

Return type:

str

resolve_trigger_state() str[source]#

Helper method for Enabled/Disabled Resolution on Trigger States

Returns:

ENABLED/DISABLED string literal

Return type:

(str)

class sagemaker.mlops.workflow.TuningStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TuningStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_hyper_parameter_tuning_job.

NOTE: The CreateHyperParameterTuningJob request is not quite the

args list that workflow needs.

get_top_model_s3_uri(top_k: int, s3_bucket: str, prefix: str = '') Join[source]#

Get the model artifact S3 URI from the top performing training jobs.

Parameters:
  • top_k (int) – The index of the top performing training job tuning step stores up to 50 top performing training jobs. A valid top_k value is from 0 to 49. The best training job model is at index 0.

  • s3_bucket (str) – The S3 bucket to store the training job output artifact.

  • prefix (str) – The S3 key prefix to store the training job output artifact.

property properties#

A Properties object

A Properties object representing DescribeHyperParameterTuningJobResponse and ListTrainingJobsForHyperParameterTuningJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

Modules

automl_step

The AutoMLStep definition for SageMaker Pipelines Workflows

callback_step

The step definitions for workflow.

check_job_config

Common config for QualityCheckStep and ClarifyCheckStep.

clarify_check_step

The step definitions for workflow.

condition_step

The step definitions for workflow.

emr_serverless_step

The step definitions for EMR Serverless workflow.

emr_step

The step definitions for workflow.

fail_step

The Step definitions for SageMaker Pipelines Workflows.

function_step

A proxy to the function returns of arbitrary type.

lambda_step

The step definitions for workflow.

model_step

The ModelStep definition for SageMaker Pipelines Workflows

monitor_batch_transform_step

The MonitorBatchTransform definition for SageMaker Pipelines Workflows

notebook_job_step

The notebook job step definitions for workflow.

parallelism_config

Pipeline Parallelism Configuration

pipeline

The Pipeline entity for workflow.

pipeline_experiment_config

Pipeline experiment config for SageMaker pipeline.

quality_check_step

The step definitions for workflow.

retry

Pipeline parameters and conditions for workflow.

selective_execution_config

Pipeline Parallelism Configuration

step_collections

The step definitions for workflow.

steps

The Step definitions for SageMaker Pipelines Workflows.

triggers

The Trigger entity for EventBridge Integration.