sagemaker.mlops.workflow.pipeline#

The Pipeline entity for workflow.

Functions

format_start_parameters(parameters)

Formats start parameter overrides as a list of dicts.

get_function_step_result(step_name, ...)

Helper function to retrieve the output of a @step decorated function.

interpolate(request_obj, ...)

Replaces Parameter values in a list of nested Dict[str, Any] with their workflow expression.

update_args(args, **kwargs)

Updates the request arguments dict with a value, if populated.

Classes

Pipeline(name, parameters, ...)

Pipeline for workflow.

PipelineGraph(steps)

Helper class representing the Pipeline Directed Acyclic Graph (DAG)

class sagemaker.mlops.workflow.pipeline.Pipeline(name: str = '', parameters: ~typing.Sequence[~sagemaker.core.workflow.parameters.Parameter] | None = None, pipeline_experiment_config: ~sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig | None = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig object>, mlflow_config: ~sagemaker.core.shapes.shapes.MlflowConfig | None = None, steps: ~typing.Sequence[~sagemaker.mlops.workflow.steps.Step | ~sagemaker.core.workflow.step_outputs.StepOutput] | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, pipeline_definition_config: ~sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig | None = <sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig object>)[source]#

Bases: object

Pipeline for workflow.

build_parameters_from_execution(pipeline_execution_arn: str, parameter_value_overrides: Dict[str, str | bool | int | float] | None = None) Dict[str, str | bool | int | float][source]#

Gets the parameters from an execution, update with optional parameter value overrides.

Parameters:
  • pipeline_execution_arn (str) – The arn of the reference pipeline execution.

  • parameter_value_overrides (Dict[str, Union[str, bool, int, float]]) – Parameter dict to be updated with the parameters from the referenced execution.

Returns:

A parameter dict built from an execution and provided parameter value overrides.

create(role_arn: str = None, description: str = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration = None) Dict[str, Any][source]#

Creates a Pipeline in the Pipelines service.

Parameters:
  • role_arn (str) – The role arn that is assumed by the pipeline to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed to the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

definition() str[source]#

Converts a request structure to string representation for workflow service calls.

Returns:

A JSON formatted string of pipeline definition.

delete() Dict[str, Any][source]#

Deletes a Pipeline in the Workflow service.

Returns:

A response dict from the service.

delete_triggers(trigger_names: List[str])[source]#

Delete Triggers for a parent SageMaker Pipeline if they exist.

Parameters:

trigger_names (List[str]) – List of trigger names to be deleted. Currently, these can only be EventBridge schedule names.

describe(pipeline_version_id: int | None = None) Dict[str, Any][source]#

Describes a Pipeline in the Workflow service.

Parameters:

pipeline_version_id (Optional[str]) – version ID of the pipeline to describe.

Returns:

Response dict from the service. See boto3 client documentation

describe_trigger(trigger_name: str) Dict[str, Any][source]#

Describe Trigger for a parent SageMaker Pipeline.

Parameters:

trigger_name (str) – Trigger name to be described. Currently, this can only be an EventBridge schedule name.

Returns:

Trigger describe responses from EventBridge.

Return type:

Dict[str, str]

property latest_pipeline_version_id#

Retrieves the latest version id of this pipeline

list_executions(sort_by: str | None = None, sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) Dict[str, Any][source]#

Lists a pipeline’s executions.

Parameters:
  • sort_by (str) – The field by which to sort results(CreationTime/PipelineExecutionArn).

  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Execution Summaries. See boto3 client list_pipeline_executions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_pipeline_executions

list_pipeline_versions(sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) str[source]#

Lists a pipeline’s versions.

Parameters:
  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Version Summaries. See boto3 client list_pipeline_versions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#

put_triggers(triggers: List[Trigger], role_arn: str | None = None) List[str][source]#

Attach triggers to a parent SageMaker Pipeline.

Parameters:
  • triggers (List[Trigger]) – List of supported triggers. Currently, this can only be of type PipelineSchedule.

  • role_arn (str) – The role arn that is assumed by EventBridge service.

Returns:

Successfully created trigger Arn(s). Currently, the pythonSDK only supports

PipelineSchedule triggers, thus, this is a list of EventBridge Schedule Arn(s) that were created/upserted.

Return type:

List[str]

start(parameters: Dict[str, str | bool | int | float] = None, execution_display_name: str = None, execution_description: str = None, parallelism_config: ParallelismConfiguration = None, selective_execution_config: SelectiveExecutionConfig = None, mlflow_experiment_name: str = None, pipeline_version_id: int = None)[source]#

Starts a Pipeline execution in the Workflow service.

Parameters:
  • parameters (Dict[str, Union[str, bool, int, float]]) – values to override pipeline parameters.

  • execution_display_name (str) – The display name of the pipeline execution.

  • execution_description (str) – A description of the execution.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

  • selective_execution_config (Optional[SelectiveExecutionConfig]) – The configuration for selective step execution.

  • mlflow_experiment_name (str) – Optional MLflow experiment name to override the experiment name specified in the pipeline’s mlflow_config. If provided, this will override the experiment name for this specific pipeline execution only, without modifying the pipeline definition.

  • pipeline_version_id (Optional[str]) – version ID of the pipeline to start the execution from. If not specified, uses the latest version ID.

Returns:

A _PipelineExecution instance, if successful.

update(role_arn: str | None = None, description: str | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Updates a Pipeline in the Workflow service.

Parameters:
  • role_arn (str) – The role arn that is assumed by pipelines to create step artifacts.

  • description (str) – A description of the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

upsert(role_arn: str | None = None, description: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Creates a pipeline or updates it, if it already exists.

Parameters:
  • role_arn (str) – The role arn that is assumed by workflow to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed.

  • steps (parallelism_config (Optional[Config for parallel) – is applied to each of the executions

  • that (Parallelism configuration) – is applied to each of the executions

Returns:

response dict from service

class sagemaker.mlops.workflow.pipeline.PipelineGraph(steps: Sequence[Step])[source]#

Bases: object

Helper class representing the Pipeline Directed Acyclic Graph (DAG)

steps#

Sequence of `Step`s that represent each node in the pipeline DAG

Type:

Sequence[Step]

classmethod from_pipeline(pipeline: Pipeline)[source]#

Create a PipelineGraph object from the Pipeline object.

get_steps_in_sub_dag(current_step: Step, sub_dag_steps: Set[str] | None = None) Set[str][source]#

Get names of all steps (including current step) in the sub dag of current step.

Returns a set of step names in the sub dag.

is_cyclic() bool[source]#

Check if this pipeline graph is cyclic.

Returns true if it is cyclic, false otherwise.

sagemaker.mlops.workflow.pipeline.format_start_parameters(parameters: Dict[str, Any]) List[Dict[str, Any]][source]#

Formats start parameter overrides as a list of dicts.

This list of dicts adheres to the request schema of:

{“Name”: “MyParameterName”, “Value”: “MyValue”}

Parameters:

parameters (Dict[str, Any]) – A dict of named values where the keys are the names of the parameters to pass values into.

sagemaker.mlops.workflow.pipeline.get_function_step_result(step_name: str, step_list: list, execution_id: str, sagemaker_session: Session)[source]#

Helper function to retrieve the output of a @step decorated function.

Parameters:
  • step_name (str) – The name of the pipeline step.

  • step_list (list) – A list of executed pipeline steps of the specified execution.

  • execution_id (str) – The specified id of the pipeline execution.

  • sagemaker_session (Session) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed.

Returns:

The step output.

Raises:
  • ValueError if the provided step is not a @step decorated function.

  • RemoteFunctionError if the provided step is not in "Completed" status

sagemaker.mlops.workflow.pipeline.interpolate(request_obj: Dict[str, Any] | List[Dict[str, Any]], callback_output_to_step_map: Dict[str, str], lambda_output_to_step_map: Dict[str, str], pipeline_name: str) Dict[str, Any] | List[Dict[str, Any]][source]#

Replaces Parameter values in a list of nested Dict[str, Any] with their workflow expression.

Parameters:
  • request_obj (RequestType) – The request dict.

  • callback_output_to_step_map (Dict[str, str]) – A dict of output name -> step name.

  • lambda_output_to_step_map (Dict[str, str]) – A dict of output name -> step name.

  • pipeline_name (str) – The name of the pipeline to be interpolated.

Returns:

The request dict with Parameter values replaced by their expression.

Return type:

RequestType

sagemaker.mlops.workflow.pipeline.update_args(args: Dict[str, Any], **kwargs)[source]#

Updates the request arguments dict with a value, if populated.

This handles the case when the service API doesn’t like NoneTypes for argument values.

Parameters:
  • request_args (Dict[str, Any]) – The request arguments dict.

  • kwargs – key, value pairs to update the args dict with.