sagemaker.core.workflow.utilities#

Utilities to support workflow.

Functions

execute_job_functions(step_args)

Execute the job class functions during pipeline definition construction

get_code_hash(step)

Get the hash of the code artifact(s) for the given step

get_config_hash(step)

Get the hash of the config artifact(s) for the given step

get_processing_code_hash(code, source_dir, ...)

Get the hash of a processing step's code artifact(s).

get_processing_dependencies(dependency_args)

Get the Processing job dependencies from the processor run kwargs

get_training_code_hash(entry_point, source_dir)

Get the hash of a training step's code artifact(s).

hash_file(path)

Get the SHA256 hash of a file.

hash_files_or_dirs(paths)

Get the SHA256 hash of the contents of a list of files or directories.

hash_object(obj)

Get the SHA256 hash of an object.

list_to_request(entities)

Get the request structure for list of entities.

load_step_compilation_context()

Load the step compilation context from the static _pipeline_config variable

override_pipeline_parameter_var(func)

A decorator to override pipeline Parameters passed into a function

step_compilation_context_manager(...[, ...])

Expose static _pipeline_config variable to other modules

trim_request_dict(request_dict, job_key, config)

Trim request_dict for unwanted fields to not persist them in step arguments

validate_step_args_input(step_args, ...)

Validate the _StepArguments object which is passed into a pipeline step

sagemaker.core.workflow.utilities.execute_job_functions(step_args: _StepArguments)[source]#

Execute the job class functions during pipeline definition construction

Executes the job functions such as run(), fit(), or transform() that have been delayed until the pipeline gets built, for steps built with a PipelineSession.

Handles multiple functions in instances where job functions are chained together from the inheritance of different job classes (e.g. PySparkProcessor, ScriptProcessor, and Processor).

Parameters:

step_args (_StepArguments) – A _StepArguments object to be used for composing a pipeline step, contains the necessary function information

sagemaker.core.workflow.utilities.get_code_hash(step: Entity) str[source]#

Get the hash of the code artifact(s) for the given step

Parameters:

step (Entity) – A pipeline step object (Entity type because Step causes circular import)

Returns:

A hash string representing the unique code artifact(s) for the step

Return type:

str

sagemaker.core.workflow.utilities.get_config_hash(step: Entity)[source]#

Get the hash of the config artifact(s) for the given step

Parameters:

step (Entity) – A pipeline step object (Entity type because Step causes circular import)

Returns:

A hash string representing the unique config artifact(s) for the step

Return type:

str

sagemaker.core.workflow.utilities.get_processing_code_hash(code: str, source_dir: str, dependencies: List[str]) str[source]#

Get the hash of a processing step’s code artifact(s).

Parameters:
  • code (str) – Path to a file with the processing script to run

  • source_dir (str) – Path to a directory with any other processing source code dependencies aside from the entry point file

  • dependencies (str) – A list of paths to directories (absolute or relative) with any additional libraries that will be exported to the container

Returns:

A hash string representing the unique code artifact(s) for the step

Return type:

str

sagemaker.core.workflow.utilities.get_processing_dependencies(dependency_args: List[List[str]]) List[str][source]#

Get the Processing job dependencies from the processor run kwargs

Parameters:

dependency_args – A list of dependency args from processor.run()

Returns:

A list of code dependencies for the job

Return type:

List[str]

sagemaker.core.workflow.utilities.get_training_code_hash(entry_point: str, source_dir: str, dependencies: str | None = None) str[source]#

Get the hash of a training step’s code artifact(s).

Parameters:
  • entry_point (str) – The absolute or relative path to the local Python source file that should be executed as the entry point to training

  • source_dir (str) – Path to a directory with any other training source code dependencies aside from the entry point file

  • Optional[str] (dependencies) – The relative path within source_dir to a requirements.txt file with any additional libraries that will be exported to the container

Returns:

A hash string representing the unique code artifact(s) for the step

Return type:

str

sagemaker.core.workflow.utilities.hash_file(path: str) str[source]#

Get the SHA256 hash of a file.

Parameters:

path (str) – The local path for the file.

Returns:

The SHA256 hash of the file.

Return type:

str

sagemaker.core.workflow.utilities.hash_files_or_dirs(paths: List[str]) str[source]#

Get the SHA256 hash of the contents of a list of files or directories.

Hash is changed if:
  • input list is changed

  • new nested directories/files are added to any directory in the input list

  • nested directory/file names are changed for any of the inputted directories

  • content of files is edited

Parameters:

paths – List of file or directory paths

Returns:

The SHA256 hash of the list of files or directories.

Return type:

str

sagemaker.core.workflow.utilities.hash_object(obj) str[source]#

Get the SHA256 hash of an object.

Parameters:

obj (dict) – The object

Returns:

The SHA256 hash of the object

Return type:

str

sagemaker.core.workflow.utilities.list_to_request(entities: Sequence[Entity | StepCollection]) List[Dict[str, Any] | List[Dict[str, Any]]][source]#

Get the request structure for list of entities.

Parameters:

entities (Sequence[Entity]) – A list of entities.

Returns:

A request structure for a workflow service call.

Return type:

list

sagemaker.core.workflow.utilities.load_step_compilation_context()[source]#

Load the step compilation context from the static _pipeline_config variable

Returns:

a context object containing information about the current step

Return type:

_PipelineConfig

sagemaker.core.workflow.utilities.override_pipeline_parameter_var(func)[source]#

A decorator to override pipeline Parameters passed into a function

This is a temporary decorator to override pipeline Parameter objects with their default value and display warning information to instruct users to update their code.

This decorator can help to give a grace period for users to update their code when we make changes to explicitly prevent passing any pipeline variables to a function.

We should remove this decorator after the grace period.

sagemaker.core.workflow.utilities.step_compilation_context_manager(pipeline_name: str, step_name: str, sagemaker_session, code_hash: str, config_hash: str, pipeline_definition_config: PipelineDefinitionConfig, upload_runtime_scripts: bool, upload_workspace: bool, pipeline_build_time: str, function_step_secret_token: str | None = None)[source]#

Expose static _pipeline_config variable to other modules

Parameters:
  • pipeline_name (str) – pipeline name

  • step_name (str) – step name

  • sagemaker_session (sagemaker.core.helper.session.Session) – a sagemaker session

  • code_hash (str) – a hash of the code artifact for the particular step

  • config_hash (str) – a hash of the config artifact for the particular step (Processing)

  • pipeline_definition_config (PipelineDefinitionConfig) – a configuration used to toggle feature flags persistent in a pipeline definition

  • upload_runtime_scripts (bool) – flag used to manage upload of runtime scripts to s3 for a _FunctionStep in pipeline

  • upload_workspace (bool) – flag used to manage the upload of workspace to s3 for a _FunctionStep in pipeline

  • pipeline_build_time (str) – timestamp when the pipeline is being built

  • function_step_secret_token (str) – secret token used for the function step checksum

sagemaker.core.workflow.utilities.trim_request_dict(request_dict, job_key, config)[source]#

Trim request_dict for unwanted fields to not persist them in step arguments

Trim the job_name field off request_dict in cases where we do not want to include it in the pipeline definition.

Parameters:
  • request_dict (dict) – A dictionary used to build the arguments for a pipeline step, containing fields that will be passed to job client during orchestration.

  • job_key (str) – The key in a step’s arguments to look up the base_job_name if it exists

  • config (_pipeline_config) – context manager

sagemaker.core.workflow.utilities.validate_step_args_input(step_args: _StepArguments, expected_caller: Set[str], error_message: str)[source]#

Validate the _StepArguments object which is passed into a pipeline step

Parameters:
  • step_args (_StepArguments) – A _StepArguments object to be used for composing a pipeline step.

  • expected_caller (Set[str]) – The expected name of the caller function which is intercepted by the PipelineSession to get the step arguments.

  • error_message (str) – The error message to be thrown if the validation fails.