sagemaker.core.workflow.utilities#
Utilities to support workflow.
Functions
|
Execute the job class functions during pipeline definition construction |
|
Get the hash of the code artifact(s) for the given step |
|
Get the hash of the config artifact(s) for the given step |
|
Get the hash of a processing step's code artifact(s). |
|
Get the Processing job dependencies from the processor run kwargs |
|
Get the hash of a training step's code artifact(s). |
|
Get the SHA256 hash of a file. |
|
Get the SHA256 hash of the contents of a list of files or directories. |
|
Get the SHA256 hash of an object. |
|
Get the request structure for list of entities. |
Load the step compilation context from the static _pipeline_config variable |
|
A decorator to override pipeline Parameters passed into a function |
|
|
Expose static _pipeline_config variable to other modules |
|
Trim request_dict for unwanted fields to not persist them in step arguments |
|
Validate the _StepArguments object which is passed into a pipeline step |
- sagemaker.core.workflow.utilities.execute_job_functions(step_args: _StepArguments)[source]#
Execute the job class functions during pipeline definition construction
Executes the job functions such as run(), fit(), or transform() that have been delayed until the pipeline gets built, for steps built with a PipelineSession.
Handles multiple functions in instances where job functions are chained together from the inheritance of different job classes (e.g. PySparkProcessor, ScriptProcessor, and Processor).
- Parameters:
step_args (_StepArguments) – A _StepArguments object to be used for composing a pipeline step, contains the necessary function information
- sagemaker.core.workflow.utilities.get_code_hash(step: Entity) str[source]#
Get the hash of the code artifact(s) for the given step
- Parameters:
step (Entity) – A pipeline step object (Entity type because Step causes circular import)
- Returns:
A hash string representing the unique code artifact(s) for the step
- Return type:
str
- sagemaker.core.workflow.utilities.get_config_hash(step: Entity)[source]#
Get the hash of the config artifact(s) for the given step
- Parameters:
step (Entity) – A pipeline step object (Entity type because Step causes circular import)
- Returns:
A hash string representing the unique config artifact(s) for the step
- Return type:
str
- sagemaker.core.workflow.utilities.get_processing_code_hash(code: str, source_dir: str, dependencies: List[str]) str[source]#
Get the hash of a processing step’s code artifact(s).
- Parameters:
code (str) – Path to a file with the processing script to run
source_dir (str) – Path to a directory with any other processing source code dependencies aside from the entry point file
dependencies (str) – A list of paths to directories (absolute or relative) with any additional libraries that will be exported to the container
- Returns:
A hash string representing the unique code artifact(s) for the step
- Return type:
str
- sagemaker.core.workflow.utilities.get_processing_dependencies(dependency_args: List[List[str]]) List[str][source]#
Get the Processing job dependencies from the processor run kwargs
- Parameters:
dependency_args – A list of dependency args from processor.run()
- Returns:
A list of code dependencies for the job
- Return type:
List[str]
- sagemaker.core.workflow.utilities.get_training_code_hash(entry_point: str, source_dir: str, dependencies: str | None = None) str[source]#
Get the hash of a training step’s code artifact(s).
- Parameters:
entry_point (str) – The absolute or relative path to the local Python source file that should be executed as the entry point to training
source_dir (str) – Path to a directory with any other training source code dependencies aside from the entry point file
Optional[str] (dependencies) – The relative path within
source_dirto arequirements.txtfile with any additional libraries that will be exported to the container
- Returns:
A hash string representing the unique code artifact(s) for the step
- Return type:
str
- sagemaker.core.workflow.utilities.hash_file(path: str) str[source]#
Get the SHA256 hash of a file.
- Parameters:
path (str) – The local path for the file.
- Returns:
The SHA256 hash of the file.
- Return type:
str
- sagemaker.core.workflow.utilities.hash_files_or_dirs(paths: List[str]) str[source]#
Get the SHA256 hash of the contents of a list of files or directories.
- Hash is changed if:
input list is changed
new nested directories/files are added to any directory in the input list
nested directory/file names are changed for any of the inputted directories
content of files is edited
- Parameters:
paths – List of file or directory paths
- Returns:
The SHA256 hash of the list of files or directories.
- Return type:
str
- sagemaker.core.workflow.utilities.hash_object(obj) str[source]#
Get the SHA256 hash of an object.
- Parameters:
obj (dict) – The object
- Returns:
The SHA256 hash of the object
- Return type:
str
- sagemaker.core.workflow.utilities.list_to_request(entities: Sequence[Entity | StepCollection]) List[Dict[str, Any] | List[Dict[str, Any]]][source]#
Get the request structure for list of entities.
- Parameters:
entities (Sequence[Entity]) – A list of entities.
- Returns:
A request structure for a workflow service call.
- Return type:
list
- sagemaker.core.workflow.utilities.load_step_compilation_context()[source]#
Load the step compilation context from the static _pipeline_config variable
- Returns:
a context object containing information about the current step
- Return type:
_PipelineConfig
- sagemaker.core.workflow.utilities.override_pipeline_parameter_var(func)[source]#
A decorator to override pipeline Parameters passed into a function
This is a temporary decorator to override pipeline Parameter objects with their default value and display warning information to instruct users to update their code.
This decorator can help to give a grace period for users to update their code when we make changes to explicitly prevent passing any pipeline variables to a function.
We should remove this decorator after the grace period.
- sagemaker.core.workflow.utilities.step_compilation_context_manager(pipeline_name: str, step_name: str, sagemaker_session, code_hash: str, config_hash: str, pipeline_definition_config: PipelineDefinitionConfig, upload_runtime_scripts: bool, upload_workspace: bool, pipeline_build_time: str, function_step_secret_token: str | None = None)[source]#
Expose static _pipeline_config variable to other modules
- Parameters:
pipeline_name (str) – pipeline name
step_name (str) – step name
sagemaker_session (sagemaker.core.helper.session.Session) – a sagemaker session
code_hash (str) – a hash of the code artifact for the particular step
config_hash (str) – a hash of the config artifact for the particular step (Processing)
pipeline_definition_config (PipelineDefinitionConfig) – a configuration used to toggle feature flags persistent in a pipeline definition
upload_runtime_scripts (bool) – flag used to manage upload of runtime scripts to s3 for a _FunctionStep in pipeline
upload_workspace (bool) – flag used to manage the upload of workspace to s3 for a _FunctionStep in pipeline
pipeline_build_time (str) – timestamp when the pipeline is being built
function_step_secret_token (str) – secret token used for the function step checksum
- sagemaker.core.workflow.utilities.trim_request_dict(request_dict, job_key, config)[source]#
Trim request_dict for unwanted fields to not persist them in step arguments
Trim the job_name field off request_dict in cases where we do not want to include it in the pipeline definition.
- Parameters:
request_dict (dict) – A dictionary used to build the arguments for a pipeline step, containing fields that will be passed to job client during orchestration.
job_key (str) – The key in a step’s arguments to look up the base_job_name if it exists
config (_pipeline_config) – context manager
- sagemaker.core.workflow.utilities.validate_step_args_input(step_args: _StepArguments, expected_caller: Set[str], error_message: str)[source]#
Validate the _StepArguments object which is passed into a pipeline step
- Parameters:
step_args (_StepArguments) – A _StepArguments object to be used for composing a pipeline step.
expected_caller (Set[str]) – The expected name of the caller function which is intercepted by the PipelineSession to get the step arguments.
error_message (str) – The error message to be thrown if the validation fails.