sagemaker.mlops.workflow.steps#

The Step definitions for SageMaker Pipelines Workflows.

Classes

CacheConfig([enable_caching, expire_after])

Configuration class to enable caching in SageMaker Pipelines Workflows.

ConfigurableRetryStep(name, step_type[, ...])

ConfigurableRetryStep for SageMaker Pipelines Workflows.

ProcessingStep(name[, step_args, ...])

ProcessingStep for SageMaker Pipelines Workflows.

Step(name[, display_name, description, ...])

Pipeline Step for SageMaker Pipelines Workflows.

StepTypeEnum(value)

Enum of Step types.

TrainingStep(name[, step_args, ...])

TrainingStep for SageMaker Pipelines Workflows.

TransformStep(name[, step_args, ...])

TransformStep for SageMaker Pipelines Workflows.

TuningStep(name[, step_args, display_name, ...])

TuningStep for SageMaker Pipelines Workflows.

class sagemaker.mlops.workflow.steps.CacheConfig(enable_caching: bool = False, expire_after=None)[source]#

Bases: object

Configuration class to enable caching in SageMaker Pipelines Workflows.

If caching is enabled, the pipeline attempts to find a previous execution of a Step that was called with the same arguments. Step caching only considers successful execution. If a successful previous execution is found, the pipeline propagates the values from the previous execution rather than recomputing the Step. When multiple successful executions exist within the timeout period, it uses the result for the most recent successful execution.

enable_caching#

To enable Step caching. Defaults to False.

Type:

bool

expire_after#

If Step caching is enabled, a timeout also needs to defined. It defines how old a previous execution can be to be considered for reuse. Value should be an ISO 8601 duration string. Defaults to None.

Examples:

'p30d' # 30 days
'P4DT12H' # 4 days and 12 hours
'T12H' # 12 hours
Type:

str

property config#

Configures Step caching for SageMaker Pipelines Workflows.

enable_caching: bool#
class sagemaker.mlops.workflow.steps.ConfigurableRetryStep(name: str, step_type: StepTypeEnum, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: Step

ConfigurableRetryStep for SageMaker Pipelines Workflows.

add_retry_policy(retry_policy: RetryPolicy)[source]#

Add a policy to the current ConfigurableRetryStep retry policies list.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for ConfigurableRetryStep.

class sagemaker.mlops.workflow.steps.ProcessingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, property_files: List[PropertyFile] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

ProcessingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_processing_job.

NOTE: The CreateProcessingJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeProcessingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.steps.Step(name: str, display_name: str | None = None, description: str | None = None, step_type: StepTypeEnum = None, depends_on: List[str | Step | StepCollection | StepOutput] | None = None)[source]#

Bases: Entity

Pipeline Step for SageMaker Pipelines Workflows.

add_depends_on(step_names: List[str | Step | StepCollection | StepOutput])[source]#

Add Step names or Step instances to the current Step depends on list.

abstract property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to the particular Step service call.

property depends_on: List[str | Step | StepCollection | StepOutput] | None#

The list of steps the current Step depends on.

abstract property properties#

The properties of the particular Step.

property ref: Dict[str, str]#

Gets a reference dictionary for Step instances.

property step_only_arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to this Step only.

Compound Steps such as the ConditionStep will have to override this method to return arguments pertaining to only that step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for workflow service calls.

class sagemaker.mlops.workflow.steps.StepTypeEnum(value)[source]#

Bases: Enum

Enum of Step types.

AUTOML = 'AutoML'#
CALLBACK = 'Callback'#
CLARIFY_CHECK = 'ClarifyCheck'#
CONDITION = 'Condition'#
CREATE_MODEL = 'Model'#
EMR = 'EMR'#
EMR_SERVERLESS = 'EMRServerless'#
FAIL = 'Fail'#
LAMBDA = 'Lambda'#
PROCESSING = 'Processing'#
QUALITY_CHECK = 'QualityCheck'#
REGISTER_MODEL = 'RegisterModel'#
TRAINING = 'Training'#
TRANSFORM = 'Transform'#
TUNING = 'Tuning'#
class sagemaker.mlops.workflow.steps.TrainingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TrainingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_training_job.

NOTE: The CreateTrainingJob request is not quite the args list that workflow needs.

property properties#

A Properties object representing the DescribeTrainingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the request dictionary with cache configuration.

class sagemaker.mlops.workflow.steps.TransformStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TransformStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_transform_job.

NOTE: The CreateTransformJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeTransformJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.steps.TuningStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TuningStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_hyper_parameter_tuning_job.

NOTE: The CreateHyperParameterTuningJob request is not quite the

args list that workflow needs.

get_top_model_s3_uri(top_k: int, s3_bucket: str, prefix: str = '') Join[source]#

Get the model artifact S3 URI from the top performing training jobs.

Parameters:
  • top_k (int) – The index of the top performing training job tuning step stores up to 50 top performing training jobs. A valid top_k value is from 0 to 49. The best training job model is at index 0.

  • s3_bucket (str) – The S3 bucket to store the training job output artifact.

  • prefix (str) – The S3 key prefix to store the training job output artifact.

property properties#

A Properties object

A Properties object representing DescribeHyperParameterTuningJobResponse and ListTrainingJobsForHyperParameterTuningJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.