sagemaker.mlops.workflow.pipeline#
The Pipeline entity for workflow.
Functions
|
Formats start parameter overrides as a list of dicts. |
|
Helper function to retrieve the output of a |
|
Replaces Parameter values in a list of nested Dict[str, Any] with their workflow expression. |
|
Updates the request arguments dict with a value, if populated. |
Classes
|
Pipeline for workflow. |
|
Helper class representing the Pipeline Directed Acyclic Graph (DAG) |
- class sagemaker.mlops.workflow.pipeline.Pipeline(name: str = '', parameters: ~typing.Sequence[~sagemaker.core.workflow.parameters.Parameter] | None = None, pipeline_experiment_config: ~sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig | None = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig object>, mlflow_config: ~sagemaker.core.shapes.shapes.MlflowConfig | None = None, steps: ~typing.Sequence[~sagemaker.mlops.workflow.steps.Step | ~sagemaker.core.workflow.step_outputs.StepOutput] | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, pipeline_definition_config: ~sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig | None = <sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig object>)[source]#
Bases:
objectPipeline for workflow.
- build_parameters_from_execution(pipeline_execution_arn: str, parameter_value_overrides: Dict[str, str | bool | int | float] | None = None) Dict[str, str | bool | int | float][source]#
Gets the parameters from an execution, update with optional parameter value overrides.
- Parameters:
pipeline_execution_arn (str) – The arn of the reference pipeline execution.
parameter_value_overrides (Dict[str, Union[str, bool, int, float]]) – Parameter dict to be updated with the parameters from the referenced execution.
- Returns:
A parameter dict built from an execution and provided parameter value overrides.
- create(role_arn: str = None, description: str = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration = None) Dict[str, Any][source]#
Creates a Pipeline in the Pipelines service.
- Parameters:
role_arn (str) – The role arn that is assumed by the pipeline to create step artifacts.
description (str) – A description of the pipeline.
tags (Optional[Tags]) – Tags to be passed to the pipeline.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
- Returns:
A response dict from the service.
- definition() str[source]#
Converts a request structure to string representation for workflow service calls.
- Returns:
A JSON formatted string of pipeline definition.
- delete() Dict[str, Any][source]#
Deletes a Pipeline in the Workflow service.
- Returns:
A response dict from the service.
- delete_triggers(trigger_names: List[str])[source]#
Delete Triggers for a parent SageMaker Pipeline if they exist.
- Parameters:
trigger_names (List[str]) – List of trigger names to be deleted. Currently, these can only be EventBridge schedule names.
- describe(pipeline_version_id: int | None = None) Dict[str, Any][source]#
Describes a Pipeline in the Workflow service.
- Parameters:
pipeline_version_id (Optional[str]) – version ID of the pipeline to describe.
- Returns:
Response dict from the service. See boto3 client documentation
- describe_trigger(trigger_name: str) Dict[str, Any][source]#
Describe Trigger for a parent SageMaker Pipeline.
- Parameters:
trigger_name (str) – Trigger name to be described. Currently, this can only be an EventBridge schedule name.
- Returns:
Trigger describe responses from EventBridge.
- Return type:
Dict[str, str]
- property latest_pipeline_version_id#
Retrieves the latest version id of this pipeline
- list_executions(sort_by: str | None = None, sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) Dict[str, Any][source]#
Lists a pipeline’s executions.
- Parameters:
sort_by (str) – The field by which to sort results(CreationTime/PipelineExecutionArn).
sort_order (str) – The sort order for results (Ascending/Descending).
max_results (int) – The maximum number of pipeline executions to return in the response.
next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.
- Returns:
List of Pipeline Execution Summaries. See boto3 client list_pipeline_executions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_pipeline_executions
- list_pipeline_versions(sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) str[source]#
Lists a pipeline’s versions.
- Parameters:
sort_order (str) – The sort order for results (Ascending/Descending).
max_results (int) – The maximum number of pipeline executions to return in the response.
next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.
- Returns:
List of Pipeline Version Summaries. See boto3 client list_pipeline_versions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#
- put_triggers(triggers: List[Trigger], role_arn: str | None = None) List[str][source]#
Attach triggers to a parent SageMaker Pipeline.
- Parameters:
triggers (List[Trigger]) – List of supported triggers. Currently, this can only be of type PipelineSchedule.
role_arn (str) – The role arn that is assumed by EventBridge service.
- Returns:
- Successfully created trigger Arn(s). Currently, the pythonSDK only supports
PipelineSchedule triggers, thus, this is a list of EventBridge Schedule Arn(s) that were created/upserted.
- Return type:
List[str]
- start(parameters: Dict[str, str | bool | int | float] = None, execution_display_name: str = None, execution_description: str = None, parallelism_config: ParallelismConfiguration = None, selective_execution_config: SelectiveExecutionConfig = None, mlflow_experiment_name: str = None, pipeline_version_id: int = None)[source]#
Starts a Pipeline execution in the Workflow service.
- Parameters:
parameters (Dict[str, Union[str, bool, int, float]]) – values to override pipeline parameters.
execution_display_name (str) – The display name of the pipeline execution.
execution_description (str) – A description of the execution.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
selective_execution_config (Optional[SelectiveExecutionConfig]) – The configuration for selective step execution.
mlflow_experiment_name (str) – Optional MLflow experiment name to override the experiment name specified in the pipeline’s mlflow_config. If provided, this will override the experiment name for this specific pipeline execution only, without modifying the pipeline definition.
pipeline_version_id (Optional[str]) – version ID of the pipeline to start the execution from. If not specified, uses the latest version ID.
- Returns:
A _PipelineExecution instance, if successful.
- update(role_arn: str | None = None, description: str | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#
Updates a Pipeline in the Workflow service.
- Parameters:
role_arn (str) – The role arn that is assumed by pipelines to create step artifacts.
description (str) – A description of the pipeline.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
- Returns:
A response dict from the service.
- upsert(role_arn: str | None = None, description: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#
Creates a pipeline or updates it, if it already exists.
- Parameters:
role_arn (str) – The role arn that is assumed by workflow to create step artifacts.
description (str) – A description of the pipeline.
tags (Optional[Tags]) – Tags to be passed.
steps (parallelism_config (Optional[Config for parallel) – is applied to each of the executions
that (Parallelism configuration) – is applied to each of the executions
- Returns:
response dict from service
- class sagemaker.mlops.workflow.pipeline.PipelineGraph(steps: Sequence[Step])[source]#
Bases:
objectHelper class representing the Pipeline Directed Acyclic Graph (DAG)
- classmethod from_pipeline(pipeline: Pipeline)[source]#
Create a PipelineGraph object from the Pipeline object.
- sagemaker.mlops.workflow.pipeline.format_start_parameters(parameters: Dict[str, Any]) List[Dict[str, Any]][source]#
Formats start parameter overrides as a list of dicts.
This list of dicts adheres to the request schema of:
{“Name”: “MyParameterName”, “Value”: “MyValue”}
- Parameters:
parameters (Dict[str, Any]) – A dict of named values where the keys are the names of the parameters to pass values into.
- sagemaker.mlops.workflow.pipeline.get_function_step_result(step_name: str, step_list: list, execution_id: str, sagemaker_session: Session)[source]#
Helper function to retrieve the output of a
@stepdecorated function.- Parameters:
step_name (str) – The name of the pipeline step.
step_list (list) – A list of executed pipeline steps of the specified execution.
execution_id (str) – The specified id of the pipeline execution.
sagemaker_session (Session) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed.
- Returns:
The step output.
- Raises:
ValueError if the provided step is not a @step decorated function. –
RemoteFunctionError if the provided step is not in "Completed" status –
- sagemaker.mlops.workflow.pipeline.interpolate(request_obj: Dict[str, Any] | List[Dict[str, Any]], callback_output_to_step_map: Dict[str, str], lambda_output_to_step_map: Dict[str, str], pipeline_name: str) Dict[str, Any] | List[Dict[str, Any]][source]#
Replaces Parameter values in a list of nested Dict[str, Any] with their workflow expression.
- Parameters:
request_obj (RequestType) – The request dict.
callback_output_to_step_map (Dict[str, str]) – A dict of output name -> step name.
lambda_output_to_step_map (Dict[str, str]) – A dict of output name -> step name.
pipeline_name (str) – The name of the pipeline to be interpolated.
- Returns:
The request dict with Parameter values replaced by their expression.
- Return type:
RequestType
- sagemaker.mlops.workflow.pipeline.update_args(args: Dict[str, Any], **kwargs)[source]#
Updates the request arguments dict with a value, if populated.
This handles the case when the service API doesn’t like NoneTypes for argument values.
- Parameters:
request_args (Dict[str, Any]) – The request arguments dict.
kwargs – key, value pairs to update the args dict with.