SageMaker MLOps

Contents

SageMaker MLOps#

MLOps capabilities including pipelines, workflows, and model management.

Pipeline Management#

SageMaker MLOps package for workflow orchestration and model building.

This package provides high-level orchestration capabilities for SageMaker workflows, including pipeline definitions, step implementations, and model building utilities.

The MLOps package sits at the top of the dependency hierarchy and can import from: - sagemaker.core (foundation primitives) - sagemaker.train (training functionality) - sagemaker.serve (serving functionality)

Key components: - workflow: Pipeline and step orchestration - model_builder: Model building and orchestration

Example usage:

from sagemaker.mlops import ModelBuilder from sagemaker.mlops.workflow import Pipeline, TrainingStep

class sagemaker.mlops.ModelBuilder(model: object | str | ~sagemaker.train.model_trainer.ModelTrainer | ~sagemaker.train.base_trainer.BaseTrainer | ~sagemaker.core.resources.TrainingJob | ~sagemaker.core.resources.ModelPackage | ~typing.List[~sagemaker.core.resources.Model] | None = None, model_path: str | None = <factory>, inference_spec: ~sagemaker.serve.spec.inference_spec.InferenceSpec | None = None, schema_builder: ~sagemaker.serve.builder.schema_builder.SchemaBuilder | None = None, modelbuilder_list: ~typing.List[~sagemaker.serve.model_builder.ModelBuilder] | None = None, role_arn: str | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, image_uri: str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable | None = None, s3_model_data_url: str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable | ~typing.Dict[str, ~typing.Any] | None = None, source_code: ~sagemaker.core.training.configs.SourceCode | None = None, env_vars: ~typing.Dict[str, str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable] | None = <factory>, model_server: ~sagemaker.serve.utils.types.ModelServer | None = None, model_metadata: ~typing.Dict[str, ~typing.Any] | None = None, log_level: int | None = 10, content_type: str | None = None, accept_type: str | None = None, compute: ~sagemaker.core.training.configs.Compute | None = None, network: ~sagemaker.core.training.configs.Networking | None = None, instance_type: str | None = None, mode: ~sagemaker.serve.mode.function_pointers.Mode | None = Mode.SAGEMAKER_ENDPOINT, shared_libs: ~typing.List[str] = <factory>, dependencies: ~typing.Dict[str, ~typing.Any] | None = <factory>, image_config: ~typing.Dict[str, str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable] | None = None)[source]#

Bases: _InferenceRecommenderMixin, _ModelBuilderServers, _ModelBuilderUtils

Unified interface for building and deploying machine learning models.

ModelBuilder provides a streamlined workflow for preparing and deploying ML models to Amazon SageMaker. It supports multiple frameworks (PyTorch, TensorFlow, HuggingFace, etc.), model servers (TorchServe, TGI, Triton, etc.), and deployment modes (SageMaker endpoints, local containers, in-process).

The typical workflow involves three steps: 1. Initialize ModelBuilder with your model and configuration 2. Call build() to create a deployable Model resource 3. Call deploy() to create an Endpoint resource for inference

Example

>>> from sagemaker.serve.model_builder import ModelBuilder
>>> from sagemaker.serve.mode.function_pointers import Mode
>>>
>>> # Initialize with a trained model
>>> model_builder = ModelBuilder(
...     model=my_pytorch_model,
...     role_arn="arn:aws:iam::123456789012:role/SageMakerRole",
...     instance_type="ml.m5.xlarge"
... )
>>>
>>> # Build the model (creates SageMaker Model resource)
>>> model = model_builder.build()
>>>
>>> # Deploy to endpoint (creates SageMaker Endpoint resource)
>>> endpoint = model_builder.deploy(endpoint_name="my-endpoint")
>>>
>>> # Make predictions
>>> result = endpoint.invoke(data=input_data)
Parameters:
  • model – The model to deploy. Can be a trained model object, ModelTrainer, TrainingJob, ModelPackage, or JumpStart model ID string. Either model or inference_spec is required.

  • model_path – Local directory path where model artifacts are stored or will be downloaded.

  • inference_spec – Custom inference specification with load() and invoke() functions.

  • schema_builder – Defines input/output schema for serialization and deserialization.

  • modelbuilder_list – List of ModelBuilder objects for multi-model deployments.

  • pipeline_models – List of Model objects for creating inference pipelines.

  • role_arn – IAM role ARN for SageMaker to assume.

  • sagemaker_session – Session object for managing SageMaker API interactions.

  • image_uri – Container image URI. Auto-selected if not specified.

  • s3_model_data_url – S3 URI where model artifacts are stored or will be uploaded.

  • source_code – Source code configuration for custom inference code.

  • env_vars – Environment variables to set in the container.

  • model_server – Model server to use (TORCHSERVE, TGI, TRITON, etc.).

  • model_metadata – Dictionary to override model metadata (HF_TASK, MLFLOW_MODEL_PATH, etc.).

  • log_level – Logging level for ModelBuilder operations (default: logging.DEBUG).

  • content_type – MIME type of input data. Auto-derived from schema_builder if provided.

  • accept_type – MIME type of output data. Auto-derived from schema_builder if provided.

  • compute – Compute configuration specifying instance type and count.

  • network – Network configuration including VPC settings and network isolation.

  • instance_type – EC2 instance type for deployment (e.g., ‘ml.m5.large’).

  • mode – Deployment mode (SAGEMAKER_ENDPOINT, LOCAL_CONTAINER, or IN_PROCESS).

Note

ModelBuilder returns sagemaker.core.resources.Model and sagemaker.core.resources.Endpoint objects, not the deprecated PySDK Model and Predictor classes. Use endpoint.invoke() instead of predictor.predict() for inference.

accept_type: str | None = None#
build(model_name: str | None = None, mode: Mode | None = None, role_arn: str | None = None, sagemaker_session: Session | None = None, region: str | None = None) Model | ModelBuilder | None[source]#

Build a deployable Model instance with ModelBuilder.

Creates a SageMaker Model resource with the appropriate container image, model artifacts, and configuration. This method prepares the model for deployment but does not deploy it to an endpoint. Use the deploy() method to create an endpoint.

Note: This returns a sagemaker.core.resources.Model object, not the deprecated PySDK Model class.

Parameters:
  • model_name (str, optional) – The name for the SageMaker model. If not specified, a unique name will be generated. (Default: None).

  • mode (Mode, optional) – The mode of operation. Options are SAGEMAKER_ENDPOINT, LOCAL_CONTAINER, or IN_PROCESS. (Default: None, uses mode from initialization).

  • role_arn (str, optional) – The IAM role ARN for SageMaker to assume when creating the model and endpoint. (Default: None).

  • sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, uses the session from initialization or creates one using the default AWS configuration chain. (Default: None).

  • region (str, optional) – The AWS region for deployment. If specified and different from the current region, a new session will be created. (Default: None).

Returns:

A sagemaker.core.resources.Model resource

that represents the created SageMaker model, or a ModelBuilder instance for multi-model scenarios.

Return type:

Union[Model, ModelBuilder, None]

Example

>>> model_builder = ModelBuilder(model=my_model, role_arn=role)
>>> model = model_builder.build()  # Creates Model resource
>>> endpoint = model_builder.deploy()  # Creates Endpoint resource
>>> result = endpoint.invoke(data=input_data)
compute: Compute | None = None#
configure_for_torchserve(shared_libs: List[str] | None = None, dependencies: Dict[str, Any] | None = None, image_config: Dict[str, str | PipelineVariable] | None = None) ModelBuilder[source]#

Configure ModelBuilder for TorchServe deployment.

content_type: str | None = None#
dependencies: Dict[str, Any] | None#
deploy(endpoint_name: str = None, initial_instance_count: int | None = 1, instance_type: str | None = None, wait: bool = True, update_endpoint: bool | None = False, container_timeout_in_seconds: int = 300, inference_config: ServerlessInferenceConfig | AsyncInferenceConfig | BatchTransformInferenceConfig | ResourceRequirements | None = None, custom_orchestrator_instance_type: str = None, custom_orchestrator_initial_instance_count: int = None, **kwargs) Endpoint | LocalEndpoint | Transformer[source]#

Deploy the built model to an Endpoint.

Creates a SageMaker EndpointConfig and deploys an Endpoint resource from the model created by build(). The model must be built before calling deploy().

Note: This returns a sagemaker.core.resources.Endpoint object, not the deprecated PySDK Predictor class. Use endpoint.invoke() to make predictions.

Parameters:
  • endpoint_name (str) – The name of the endpoint to create. If not specified, a unique endpoint name will be created. (Default: “endpoint”).

  • initial_instance_count (int, optional) – The initial number of instances to run in the endpoint. Required for instance-based endpoints. (Default: 1).

  • instance_type (str, optional) – The EC2 instance type to deploy this model to. For example, ‘ml.p2.xlarge’. Required for instance-based endpoints unless using serverless inference. (Default: None).

  • wait (bool) – Whether the call should wait until the deployment completes. (Default: True).

  • update_endpoint (bool) – Flag to update the model in an existing Amazon SageMaker endpoint. If True, deploys a new EndpointConfig to an existing endpoint and deletes resources from the previous EndpointConfig. (Default: False).

  • container_timeout_in_seconds (int) – The timeout value, in seconds, for the container to respond to requests. (Default: 300).

  • (Union[ServerlessInferenceConfig (inference_config) – BatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference configuration parameter. Can be used instead of individual config parameters. (Default: None).

  • AsyncInferenceConfig – BatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference configuration parameter. Can be used instead of individual config parameters. (Default: None).

:paramBatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference

configuration parameter. Can be used instead of individual config parameters. (Default: None).

Parameters:
  • custom_orchestrator_instance_type (str, optional) – Instance type for custom orchestrator deployment. (Default: None).

  • custom_orchestrator_initial_instance_count (int, optional) – Initial instance count for custom orchestrator deployment. (Default: None).

Returns:

A sagemaker.core.resources.Endpoint

resource representing the deployed endpoint, a LocalEndpoint for local mode, or a Transformer for batch transform inference.

Return type:

Union[Endpoint, LocalEndpoint, Transformer]

Example

>>> model_builder = ModelBuilder(model=my_model, role_arn=role, instance_type="ml.m5.xlarge")
>>> model = model_builder.build()  # Creates Model resource
>>> endpoint = model_builder.deploy(endpoint_name="my-endpoint")  # Creates Endpoint resource
>>> result = endpoint.invoke(data=input_data)  # Make predictions
deploy_local(endpoint_name: str = 'endpoint', container_timeout_in_seconds: int = 300, **kwargs) LocalEndpoint[source]#

Deploy the built model to local mode for testing.

Deploys the model locally using either LOCAL_CONTAINER mode (runs in a Docker container) or IN_PROCESS mode (runs in the current Python process). This is useful for testing and development before deploying to SageMaker endpoints. The model must be built with mode=Mode.LOCAL_CONTAINER or mode=Mode.IN_PROCESS before calling this method.

Note: This returns a LocalEndpoint object for local inference, not a SageMaker Endpoint resource. Use local_endpoint.invoke() to make predictions.

Parameters:
  • endpoint_name (str) – The name for the local endpoint. (Default: “endpoint”).

  • container_timeout_in_seconds (int) – The timeout value, in seconds, for the container to respond to requests. (Default: 300).

Returns:

A LocalEndpoint object for making local predictions.

Return type:

LocalEndpoint

Raises:

ValueError – If the model was not built with LOCAL_CONTAINER or IN_PROCESS mode.

Example

>>> model_builder = ModelBuilder(
...     model=my_model,
...     role_arn=role,
...     mode=Mode.LOCAL_CONTAINER
... )
>>> model = model_builder.build()
>>> local_endpoint = model_builder.deploy_local()
>>> result = local_endpoint.invoke(data=input_data)
display_benchmark_metrics(**kwargs) None[source]#

Display benchmark metrics for JumpStart models.

enable_network_isolation()[source]#

Whether to enable network isolation when creating this Model

Returns:

If network isolation should be enabled or not.

Return type:

bool

env_vars: Dict[str, str | PipelineVariable] | None#
fetch_endpoint_names_for_base_model() Set[str][source]#

Fetches endpoint names for the base model.

Returns:

Set of endpoint names for the base model.

classmethod from_jumpstart_config(jumpstart_config: JumpStartConfig, role_arn: str | None = None, compute: Compute | None = None, network: Networking | None = None, image_uri: str | None = None, env_vars: Dict[str, str] | None = None, model_kms_key: str | None = None, resource_requirements: ResourceRequirements | None = None, tolerate_vulnerable_model: bool | None = None, tolerate_deprecated_model: bool | None = None, sagemaker_session: Session | None = None, schema_builder: SchemaBuilder | None = None) ModelBuilder[source]#

Create a ModelBuilder instance from a JumpStart configuration.

This class method provides a convenient way to create a ModelBuilder for deploying pre-trained models from Amazon SageMaker JumpStart. It automatically retrieves the appropriate model artifacts, container images, and default configurations for the specified JumpStart model.

Parameters:
  • jumpstart_config (JumpStartConfig) – Configuration object specifying the JumpStart model to use. Must include model_id and optionally model_version and inference_config_name.

  • role_arn (str, optional) – The IAM role ARN for SageMaker to assume when creating the model and endpoint. If not specified, attempts to use the default SageMaker execution role. (Default: None).

  • compute (Compute, optional) – Compute configuration specifying instance type and instance count for deployment. For example, Compute(instance_type=’ml.g5.xlarge’, instance_count=1). (Default: None).

  • network (Networking, optional) – Network configuration including VPC settings and network isolation. For example, Networking(vpc_config={‘Subnets’: […], ‘SecurityGroupIds’: […]}, enable_network_isolation=False). (Default: None).

  • image_uri (str, optional) – Custom container image URI. If not specified, uses the default JumpStart container image for the model. (Default: None).

  • env_vars (Dict[str, str], optional) – Environment variables to set in the container. These will be merged with default JumpStart environment variables. (Default: None).

  • model_kms_key (str, optional) – KMS key ARN used to encrypt model artifacts when uploading to S3. (Default: None).

  • resource_requirements (ResourceRequirements, optional) – The compute resource requirements for deploying the model to an inference component based endpoint. (Default: None).

  • tolerate_vulnerable_model (bool, optional) – If True, allows deployment of models with known security vulnerabilities. Use with caution. (Default: None).

  • tolerate_deprecated_model (bool, optional) – If True, allows deployment of deprecated JumpStart models. (Default: None).

  • sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, creates one using the default AWS configuration chain. (Default: None).

  • schema_builder (SchemaBuilder, optional) – Schema builder for defining input/output schemas. If not specified, uses default schemas for the JumpStart model. (Default: None).

Returns:

A configured ModelBuilder instance ready to build and deploy

the specified JumpStart model.

Return type:

ModelBuilder

Example

>>> from sagemaker.core.jumpstart.configs import JumpStartConfig
>>> from sagemaker.serve.model_builder import ModelBuilder
>>>
>>> js_config = JumpStartConfig(
...     model_id="huggingface-llm-mistral-7b",
...     model_version="*"
... )
>>>
>>> from sagemaker.core.training.configs import Compute
>>>
>>> model_builder = ModelBuilder.from_jumpstart_config(
...     jumpstart_config=js_config,
...     compute=Compute(instance_type="ml.g5.2xlarge", instance_count=1)
... )
>>>
>>> model = model_builder.build()  # Creates Model resource
>>> endpoint = model_builder.deploy()  # Creates Endpoint resource
>>> result = endpoint.invoke(data=input_data)  # Make predictions
get_deployment_config() Dict[str, Any] | None[source]#

Gets the deployment config to apply to the model.

image_config: Dict[str, str | PipelineVariable] | None = None#
image_uri: str | PipelineVariable | None = None#
inference_spec: InferenceSpec | None = None#
instance_type: str | None = None#
is_repack() bool[source]#

Whether the source code needs to be repacked before uploading to S3.

Returns:

if the source need to be repacked or not

Return type:

bool

list_deployment_configs() List[Dict[str, Any]][source]#

List deployment configs for the model in the current region.

log_level: int | None = 10#
mode: Mode | None = 3#
model: object | str | ModelTrainer | BaseTrainer | TrainingJob | ModelPackage | List[Model] | None = None#
model_metadata: Dict[str, Any] | None = None#
model_path: str | None#
model_server: ModelServer | None = None#
modelbuilder_list: List[ModelBuilder] | None = None#
network: Networking | None = None#
optimize(model_name: str | None = 'optimize_model', output_path: str | None = None, instance_type: str | None = None, role_arn: str | None = None, sagemaker_session: Session | None = None, region: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, job_name: str | None = None, accept_eula: bool | None = None, quantization_config: Dict | None = None, compilation_config: Dict | None = None, speculative_decoding_config: Dict | None = None, sharding_config: Dict | None = None, env_vars: Dict | None = None, vpc_config: Dict | None = None, kms_key: str | None = None, image_uri: str | None = None, max_runtime_in_sec: int | None = 36000) Model[source]#

Create an optimized deployable Model instance with ModelBuilder.

Runs a SageMaker model optimization job to quantize, compile, or shard the model for improved inference performance. Returns a Model resource that can be deployed using the deploy() method.

Note: This returns a sagemaker.core.resources.Model object.

Parameters:
  • output_path (str, optional) – S3 URI where the optimized model artifacts will be stored. If not specified, uses the default output path. (Default: None).

  • instance_type (str, optional) – Target deployment instance type that the model is optimized for. For example, ‘ml.p4d.24xlarge’. (Default: None).

  • role_arn (str, optional) – IAM execution role ARN for the optimization job. If not specified, uses the role from initialization. (Default: None).

  • sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, uses the session from initialization or creates one using the default AWS configuration chain. (Default: None).

  • region (str, optional) – The AWS region for the optimization job. If specified and different from the current region, a new session will be created. (Default: None).

  • model_name (str, optional) – The name for the optimized SageMaker model. If not specified, a unique name will be generated. (Default: None).

  • tags (Tags, optional) – Tags for labeling the model optimization job. (Default: None).

  • job_name (str, optional) – The name of the model optimization job. If not specified, a unique name will be generated. (Default: None).

  • accept_eula (bool, optional) – For models that require a Model Access Config, specify True or False to indicate whether model terms of use have been accepted. The accept_eula value must be explicitly defined as True in order to accept the end-user license agreement (EULA) that some models require. (Default: None).

  • quantization_config (Dict, optional) – Quantization configuration specifying the quantization method and parameters. For example: {‘OverrideEnvironment’: {‘OPTION_QUANTIZE’: ‘awq’}}. (Default: None).

  • compilation_config (Dict, optional) – Compilation configuration for optimizing the model for specific hardware. (Default: None).

  • speculative_decoding_config (Dict, optional) – Speculative decoding configuration for improving inference latency of large language models. (Default: None).

  • sharding_config (Dict, optional) – Model sharding configuration for distributing large models across multiple devices. (Default: None).

  • env_vars (Dict, optional) – Additional environment variables to pass to the optimization container. (Default: None).

  • vpc_config (Dict, optional) – VPC configuration for the optimization job. Should contain ‘Subnets’ and ‘SecurityGroupIds’ keys. (Default: None).

  • kms_key (str, optional) – KMS key ARN used to encrypt the optimized model artifacts when uploading to S3. (Default: None).

  • image_uri (str, optional) – Custom container image URI for the optimization job. If not specified, uses the default optimization container. (Default: None).

  • max_runtime_in_sec (int) – Maximum job execution time in seconds. The optimization job will be stopped if it exceeds this time. (Default: 36000).

Returns:

A sagemaker.core.resources.Model resource containing the optimized

model artifacts, ready for deployment.

Return type:

Model

Example

>>> model_builder = ModelBuilder(model=my_model, role_arn=role)
>>> optimized_model = model_builder.optimize(
...     instance_type="ml.g5.xlarge",
...     quantization_config={'OverrideEnvironment': {'OPTION_QUANTIZE': 'awq'}}
... )
>>> endpoint = model_builder.deploy()  # Deploy the optimized model
>>> result = endpoint.invoke(data=input_data)
register(model_package_name: str | PipelineVariable | None = None, model_package_group_name: str | PipelineVariable | None = None, content_types: List[str | PipelineVariable] = None, response_types: List[str | PipelineVariable] = None, inference_instances: List[str | PipelineVariable] | None = None, transform_instances: List[str | PipelineVariable] | None = None, model_metrics: ModelMetrics | None = None, metadata_properties: MetadataProperties | None = None, marketplace_cert: bool = False, approval_status: str | PipelineVariable | None = None, description: str | None = None, drift_check_baselines: DriftCheckBaselines | None = None, customer_metadata_properties: Dict[str, str | PipelineVariable] | None = None, validation_specification: str | PipelineVariable | None = None, domain: str | PipelineVariable | None = None, task: str | PipelineVariable | None = None, sample_payload_url: str | PipelineVariable | None = None, nearest_model_name: str | PipelineVariable | None = None, data_input_configuration: str | PipelineVariable | None = None, skip_model_validation: str | PipelineVariable | None = None, source_uri: str | PipelineVariable | None = None, model_card: ModelPackageModelCard | ModelCard | None = None, model_life_cycle: ModelLifeCycle | None = None, accept_eula: bool | None = None, model_type: JumpStartModelType | None = None) ModelPackage | ModelPackageGroup[source]#

Creates a model package for creating SageMaker models or listing on Marketplace.

Parameters:
  • content_types (list[str] or list[PipelineVariable]) – The supported MIME types for the input data.

  • response_types (list[str] or list[PipelineVariable]) – The supported MIME types for the output data.

  • inference_instances (list[str] or list[PipelineVariable]) – A list of the instance types that are used to generate inferences in real-time (default: None).

  • transform_instances (list[str] or list[PipelineVariable]) – A list of the instance types on which a transformation job can be run or on which an endpoint can be deployed (default: None).

  • model_package_name (str or PipelineVariable) – Model Package name, exclusive to model_package_group_name, using model_package_name makes the Model Package un-versioned (default: None).

  • model_package_group_name (str or PipelineVariable) – Model Package Group name, exclusive to model_package_name, using model_package_group_name makes the Model Package versioned (default: None).

  • model_metrics (ModelMetrics) – ModelMetrics object (default: None).

  • metadata_properties (MetadataProperties) – MetadataProperties object (default: None).

  • marketplace_cert (bool) – A boolean value indicating if the Model Package is certified for AWS Marketplace (default: False).

  • approval_status (str or PipelineVariable) – Model Approval Status, values can be “Approved”, “Rejected”, or “PendingManualApproval” (default: “PendingManualApproval”).

  • description (str) – Model Package description (default: None).

  • drift_check_baselines (DriftCheckBaselines) – DriftCheckBaselines object (default: None).

  • customer_metadata_properties (dict[str, str] or dict[str, PipelineVariable]) – A dictionary of key-value paired metadata properties (default: None).

  • domain (str or PipelineVariable) – Domain values can be “COMPUTER_VISION”, “NATURAL_LANGUAGE_PROCESSING”, “MACHINE_LEARNING” (default: None).

  • task (str or PipelineVariable) – Task values which are supported by Inference Recommender are “FILL_MASK”, “IMAGE_CLASSIFICATION”, “OBJECT_DETECTION”, “TEXT_GENERATION”, “IMAGE_SEGMENTATION”, “CLASSIFICATION”, “REGRESSION”, “OTHER” (default: None).

  • sample_payload_url (str or PipelineVariable) – The S3 path where the sample payload is stored (default: None).

  • nearest_model_name (str or PipelineVariable) – Name of a pre-trained machine learning benchmarked by Amazon SageMaker Inference Recommender (default: None).

  • data_input_configuration (str or PipelineVariable) – Input object for the model (default: None).

  • skip_model_validation (str or PipelineVariable) – Indicates if you want to skip model validation. Values can be “All” or “None” (default: None).

  • source_uri (str or PipelineVariable) – The URI of the source for the model package (default: None).

  • model_card (ModeCard or ModelPackageModelCard) – document contains qualitative and quantitative information about a model (default: None).

  • model_life_cycle (ModelLifeCycle) – ModelLifeCycle object (default: None).

  • accept_eula (bool) – For models that require a Model Access Config, specify True or False to indicate whether model terms of use have been accepted (default: None).

  • model_type (JumpStartModelType) – Type of JumpStart model (default: None).

Returns:

A sagemaker.model.ModelPackage instance or pipeline step arguments in case the Model instance is built with PipelineSession

Note

The following parameters are inherited from ModelBuilder.__init__ and do not need to be passed to register(): - image_uri: Use self.image_uri (defined in __init__) - framework: Use self.framework (defined in __init__) - framework_version: Use self.framework_version (defined in __init__)

role_arn: str | None = None#
s3_model_data_url: str | PipelineVariable | Dict[str, Any] | None = None#
sagemaker_session: Session | None = None#
schema_builder: SchemaBuilder | None = None#
set_deployment_config(config_name: str, instance_type: str) None[source]#

Sets the deployment config to apply to the model.

shared_libs: List[str]#
source_code: SourceCode | None = None#
to_string(obj: object)[source]#

Convert an object to string

This helper function handles converting PipelineVariable object to string as well

Parameters:

obj (object) – The object to be converted

transformer(instance_count, instance_type, strategy=None, assemble_with=None, output_path=None, output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None, max_payload=None, tags=None, volume_kms_key=None)[source]#

Return a Transformer that uses this Model.

Parameters:
  • instance_count (int) – Number of EC2 instances to use.

  • instance_type (str) – Type of EC2 instance to use, for example, ‘ml.c4.xlarge’.

  • strategy (str) – The strategy used to decide how to batch records in a single request (default: None). Valid values: ‘MultiRecord’ and ‘SingleRecord’.

  • assemble_with (str) – How the output is assembled (default: None). Valid values: ‘Line’ or ‘None’.

  • output_path (str) – S3 location for saving the transform result. If not specified, results are stored to a default bucket.

  • output_kms_key (str) – Optional. KMS key ID for encrypting the transform output (default: None).

  • accept (str) – The accept header passed by the client to the inference endpoint. If it is supported by the endpoint, it will be the format of the batch transform output.

  • env (dict) – Environment variables to be set for use during the transform job (default: None).

  • max_concurrent_transforms (int) – The maximum number of HTTP requests to be made to each individual transform container at one time.

  • max_payload (int) – Maximum size of the payload in a single HTTP request to the container in MB.

  • tags (Optional[Tags]) – Tags for labeling a transform job. If none specified, then the tags used for the training job are used for the transform job.

  • volume_kms_key (str) – Optional. KMS key ID for encrypting the volume attached to the ML compute instance (default: None).

Workflow Management#

SageMaker workflow orchestration module.

This module provides pipeline and step orchestration capabilities for SageMaker workflows. It contains the high-level classes that orchestrate training, processing, and serving components from the train and serve packages.

Key components: - Pipeline: Main workflow orchestration class - Steps: Various step implementations (TrainingStep, ProcessingStep, etc.) - Configuration: Pipeline configuration classes - Utilities: Helper functions for workflow management

Note: This module imports from sagemaker.core.workflow for primitives (entities, parameters, functions, conditions, properties) and can import from sagemaker.train and sagemaker.serve for orchestration purposes.

class sagemaker.mlops.workflow.AutoMLStep(name: str, step_args: _JobStepArguments, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

AutoMLStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_auto_ml_job.

NOTE: The CreateAutoMLJob request is not quite the

args list that workflow needs.

ModelDeployConfig and GenerateCandidateDefinitionsOnly

attribute cannot be included.

get_best_auto_ml_model_builder(role, sagemaker_session=None)[source]#

Get the best candidate model artifacts, image uri and env variables for the best model.

Parameters:
  • role (str) – An AWS IAM role (either name or full ARN). The Amazon SageMaker AutoML jobs and APIs that create Amazon SageMaker endpoints use this role to access training data and model artifacts.

  • sagemaker_session (sagemaker.core.helper.session.Session) –

    A SageMaker Session object, used for SageMaker interactions. If the best model will be used as part of ModelStep, then sagemaker_session should be class:~sagemaker.workflow.pipeline_context.PipelineSession. Example:

    model = Model(sagemaker_session=PipelineSession())
    model_step = ModelStep(step_args=model.register())
    

property properties#

A Properties object representing the DescribeAutoMLJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.CacheConfig(enable_caching: bool = False, expire_after=None)[source]#

Bases: object

Configuration class to enable caching in SageMaker Pipelines Workflows.

If caching is enabled, the pipeline attempts to find a previous execution of a Step that was called with the same arguments. Step caching only considers successful execution. If a successful previous execution is found, the pipeline propagates the values from the previous execution rather than recomputing the Step. When multiple successful executions exist within the timeout period, it uses the result for the most recent successful execution.

enable_caching#

To enable Step caching. Defaults to False.

Type:

bool

expire_after#

If Step caching is enabled, a timeout also needs to defined. It defines how old a previous execution can be to be considered for reuse. Value should be an ISO 8601 duration string. Defaults to None.

Examples:

'p30d' # 30 days
'P4DT12H' # 4 days and 12 hours
'T12H' # 12 hours
Type:

str

property config#

Configures Step caching for SageMaker Pipelines Workflows.

enable_caching: bool#
class sagemaker.mlops.workflow.CallbackOutput(output_name: str | None = None, output_type: CallbackOutputTypeEnum = CallbackOutputTypeEnum.String)[source]#

Bases: object

Output for a callback step.

output_name#

The output name

Type:

str

output_type#

The output type

Type:

CallbackOutputTypeEnum

expr(step_name) Dict[str, str][source]#

The ‘Get’ expression dict for a CallbackOutput.

output_name: str#
output_type: CallbackOutputTypeEnum#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.CallbackStep(name: str, sqs_queue_url: str, inputs: dict, outputs: List[CallbackOutput], display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

Callback step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the callback step.

property properties#

A Properties object representing the output parameters of the callback step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.CheckJobConfig(role, instance_count=1, instance_type='ml.m5.xlarge', volume_size_in_gb=30, volume_kms_key=None, output_kms_key=None, max_runtime_in_seconds=None, base_job_name=None, sagemaker_session=None, env=None, tags=None, network_config=None)[source]#

Bases: object

Check job config for QualityCheckStep and ClarifyCheckStep.

class sagemaker.mlops.workflow.ClarifyCheckStep(name: str, clarify_check_config: ClarifyCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

ClarifyCheckStep step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the ClarifyCheck step.

property properties#

A Properties object representing the output parameters of the ClarifyCheck step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration etc.

class sagemaker.mlops.workflow.ConditionStep(name: str, depends_on: List[str | Step] | None = None, display_name: str | None = None, description: str | None = None, conditions: List[Condition] | None = None, if_steps: List[Step] | None = None, else_steps: List[Step] | None = None)[source]#

Bases: Step

Conditional step for pipelines to support conditional branching in the execution of steps.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the conditional branching in the pipeline.

property properties#

A simple Properties object with Outcome as the only property

property step_only_arguments#

Argument dict pertaining to the step only, and not the if_steps or else_steps.

class sagemaker.mlops.workflow.ConfigurableRetryStep(name: str, step_type: StepTypeEnum, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: Step

ConfigurableRetryStep for SageMaker Pipelines Workflows.

add_retry_policy(retry_policy: RetryPolicy)[source]#

Add a policy to the current ConfigurableRetryStep retry policies list.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for ConfigurableRetryStep.

class sagemaker.mlops.workflow.EMRStep(name: str, display_name: str, description: str, cluster_id: str, step_config: EMRStepConfig, depends_on: List[str | Step] | None = None, cache_config: CacheConfig | None = None, cluster_config: Dict[str, Any] | None = None, execution_role_arn: str | None = None)[source]#

Bases: Step

EMR step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to call AddJobFlowSteps.

NOTE: The AddFlowJobSteps request is not quite the args list that workflow needs. The Name attribute in AddJobFlowSteps cannot be passed; it will be set during runtime. In addition to that, we will also need to include emr job inputs and output config.

property properties: Dict[str, Any] | List[Dict[str, Any]]#

A Properties object representing the EMR DescribeStepResponse model

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.EMRStepConfig(jar, args: List[str] | None = None, main_class: str | None = None, properties: List[dict] | None = None, output_args: dict[str, str] | None = None)[source]#

Bases: object

Config for a Hadoop Jar step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Convert EMRStepConfig object to request dict.

class sagemaker.mlops.workflow.FailStep(name: str, error_message: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

FailStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to define the FailStep.

property properties#

A Properties object is not available for the FailStep.

Executing a FailStep will terminate the pipeline. FailStep properties should not be referenced.

class sagemaker.mlops.workflow.LambdaOutput(output_name: str | None = None, output_type: LambdaOutputTypeEnum = LambdaOutputTypeEnum.String)[source]#

Bases: object

Output for a lambdaback step.

output_name#

The output name

Type:

str

output_type#

The output type

Type:

LambdaOutputTypeEnum

expr(step_name) Dict[str, str][source]#

The ‘Get’ expression dict for a LambdaOutput.

output_name: str#
output_type: LambdaOutputTypeEnum#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.LambdaStep(name: str, lambda_func: Lambda, display_name: str | None = None, description: str | None = None, inputs: dict | None = None, outputs: List[LambdaOutput] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step | StepCollection] | None = None)[source]#

Bases: Step

Lambda step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the lambda step.

property properties#

A Properties object representing the output parameters of the lambda step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.ModelStep(name: str, step_args: _ModelStepArguments | Dict, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | Dict[str, List[RetryPolicy]] | None = None, display_name: str | None = None, description: str | None = None, repack_model_step_settings: Dict[str, any] | None = None)[source]#

Bases: ConfigurableRetryStep

ModelStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that are used to call the appropriate SageMaker API.

property properties#

A Properties object representing the appropriate SageMaker response data model.

class sagemaker.mlops.workflow.MonitorBatchTransformStep(name: str, transform_step_args: _JobStepArguments, monitor_configuration: QualityCheckConfig | ClarifyCheckConfig, check_job_configuration: CheckJobConfig, monitor_before_transform: bool = False, fail_on_violation: bool | PipelineVariable = True, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None)[source]#

Bases: StepCollection

Creates a Transformer step with Quality or Clarify check step

Used to monitor the inputs and outputs of the batch transform job.

class sagemaker.mlops.workflow.NotebookJobStep(input_notebook: str, image_uri: str, kernel_name: str, name: str | None = None, display_name: str | None = None, description: str | None = None, notebook_job_name: str | None = None, role: str | None = None, s3_root_uri: str | None = None, parameters: Dict[str, str | PipelineVariable] | None = None, environment_variables: Dict[str, str | PipelineVariable] | None = None, initialization_script: str | None = None, s3_kms_key: str | PipelineVariable | None = None, instance_type: str | PipelineVariable | None = 'ml.m5.large', volume_size: int | PipelineVariable = 30, volume_kms_key: str | PipelineVariable | None = None, encrypt_inter_container_traffic: bool | PipelineVariable | None = True, security_group_ids: List[str | PipelineVariable] | None = None, subnets: List[str | PipelineVariable] | None = None, max_retry_attempts: int = 1, max_runtime_in_seconds: int = 172800, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, additional_dependencies: List[str] | None = None, retry_policies: List[RetryPolicy] | None = None, depends_on: List[Step | StepOutput] | None = None)[source]#

Bases: ConfigurableRetryStep

NotebookJobStep for SageMaker Pipelines Workflows.

For more details about SageMaker Notebook Jobs, see SageMaker Notebook Jobs.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

Generates the arguments dictionary that is used to create the job.

property depends_on: List[str | Step | StepCollection | StepOutput] | None#

The list of steps the current Step depends on.

property properties#

A Properties object representing the notebook job step output

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for workflow service calls.

class sagemaker.mlops.workflow.ParallelismConfiguration(max_parallel_execution_steps: int)[source]#

Bases: object

Parallelism config for SageMaker pipeline.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Returns: the request structure.

class sagemaker.mlops.workflow.Pipeline(name: str = '', parameters: ~typing.Sequence[~sagemaker.core.workflow.parameters.Parameter] | None = None, pipeline_experiment_config: ~sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig | None = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig object>, mlflow_config: ~sagemaker.core.shapes.shapes.MlflowConfig | None = None, steps: ~typing.Sequence[~sagemaker.mlops.workflow.steps.Step | ~sagemaker.core.workflow.step_outputs.StepOutput] | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, pipeline_definition_config: ~sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig | None = <sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig object>)[source]#

Bases: object

Pipeline for workflow.

build_parameters_from_execution(pipeline_execution_arn: str, parameter_value_overrides: Dict[str, str | bool | int | float] | None = None) Dict[str, str | bool | int | float][source]#

Gets the parameters from an execution, update with optional parameter value overrides.

Parameters:
  • pipeline_execution_arn (str) – The arn of the reference pipeline execution.

  • parameter_value_overrides (Dict[str, Union[str, bool, int, float]]) – Parameter dict to be updated with the parameters from the referenced execution.

Returns:

A parameter dict built from an execution and provided parameter value overrides.

create(role_arn: str = None, description: str = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration = None) Dict[str, Any][source]#

Creates a Pipeline in the Pipelines service.

Parameters:
  • role_arn (str) – The role arn that is assumed by the pipeline to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed to the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

definition() str[source]#

Converts a request structure to string representation for workflow service calls.

Returns:

A JSON formatted string of pipeline definition.

delete() Dict[str, Any][source]#

Deletes a Pipeline in the Workflow service.

Returns:

A response dict from the service.

delete_triggers(trigger_names: List[str])[source]#

Delete Triggers for a parent SageMaker Pipeline if they exist.

Parameters:

trigger_names (List[str]) – List of trigger names to be deleted. Currently, these can only be EventBridge schedule names.

describe(pipeline_version_id: int | None = None) Dict[str, Any][source]#

Describes a Pipeline in the Workflow service.

Parameters:

pipeline_version_id (Optional[str]) – version ID of the pipeline to describe.

Returns:

Response dict from the service. See boto3 client documentation

describe_trigger(trigger_name: str) Dict[str, Any][source]#

Describe Trigger for a parent SageMaker Pipeline.

Parameters:

trigger_name (str) – Trigger name to be described. Currently, this can only be an EventBridge schedule name.

Returns:

Trigger describe responses from EventBridge.

Return type:

Dict[str, str]

property latest_pipeline_version_id#

Retrieves the latest version id of this pipeline

list_executions(sort_by: str | None = None, sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) Dict[str, Any][source]#

Lists a pipeline’s executions.

Parameters:
  • sort_by (str) – The field by which to sort results(CreationTime/PipelineExecutionArn).

  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Execution Summaries. See boto3 client list_pipeline_executions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_pipeline_executions

list_pipeline_versions(sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) str[source]#

Lists a pipeline’s versions.

Parameters:
  • sort_order (str) – The sort order for results (Ascending/Descending).

  • max_results (int) – The maximum number of pipeline executions to return in the response.

  • next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.

Returns:

List of Pipeline Version Summaries. See boto3 client list_pipeline_versions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#

put_triggers(triggers: List[Trigger], role_arn: str | None = None) List[str][source]#

Attach triggers to a parent SageMaker Pipeline.

Parameters:
  • triggers (List[Trigger]) – List of supported triggers. Currently, this can only be of type PipelineSchedule.

  • role_arn (str) – The role arn that is assumed by EventBridge service.

Returns:

Successfully created trigger Arn(s). Currently, the pythonSDK only supports

PipelineSchedule triggers, thus, this is a list of EventBridge Schedule Arn(s) that were created/upserted.

Return type:

List[str]

start(parameters: Dict[str, str | bool | int | float] = None, execution_display_name: str = None, execution_description: str = None, parallelism_config: ParallelismConfiguration = None, selective_execution_config: SelectiveExecutionConfig = None, mlflow_experiment_name: str = None, pipeline_version_id: int = None)[source]#

Starts a Pipeline execution in the Workflow service.

Parameters:
  • parameters (Dict[str, Union[str, bool, int, float]]) – values to override pipeline parameters.

  • execution_display_name (str) – The display name of the pipeline execution.

  • execution_description (str) – A description of the execution.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

  • selective_execution_config (Optional[SelectiveExecutionConfig]) – The configuration for selective step execution.

  • mlflow_experiment_name (str) – Optional MLflow experiment name to override the experiment name specified in the pipeline’s mlflow_config. If provided, this will override the experiment name for this specific pipeline execution only, without modifying the pipeline definition.

  • pipeline_version_id (Optional[str]) – version ID of the pipeline to start the execution from. If not specified, uses the latest version ID.

Returns:

A _PipelineExecution instance, if successful.

update(role_arn: str | None = None, description: str | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Updates a Pipeline in the Workflow service.

Parameters:
  • role_arn (str) – The role arn that is assumed by pipelines to create step artifacts.

  • description (str) – A description of the pipeline.

  • parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.

Returns:

A response dict from the service.

upsert(role_arn: str | None = None, description: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#

Creates a pipeline or updates it, if it already exists.

Parameters:
  • role_arn (str) – The role arn that is assumed by workflow to create step artifacts.

  • description (str) – A description of the pipeline.

  • tags (Optional[Tags]) – Tags to be passed.

  • steps (parallelism_config (Optional[Config for parallel) – is applied to each of the executions

  • that (Parallelism configuration) – is applied to each of the executions

Returns:

response dict from service

class sagemaker.mlops.workflow.PipelineExperimentConfig(experiment_name: str | Parameter | ExecutionVariable | PipelineVariable, trial_name: str | Parameter | ExecutionVariable | PipelineVariable)[source]#

Bases: Entity

Experiment config for SageMaker pipeline.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Returns: the request structure.

class sagemaker.mlops.workflow.PipelineExperimentConfigProperties[source]#

Bases: object

Enum-like class for all pipeline experiment config property references.

EXPERIMENT_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
TRIAL_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
class sagemaker.mlops.workflow.PipelineExperimentConfigProperty(name: str)[source]#

Bases: PipelineVariable

Reference to pipeline experiment config property.

property expr: Dict[str, Any] | List[Dict[str, Any]]#

The ‘Get’ expression dict for a pipeline experiment config property.

class sagemaker.mlops.workflow.PipelineGraph(steps: Sequence[Step])[source]#

Bases: object

Helper class representing the Pipeline Directed Acyclic Graph (DAG)

steps#

Sequence of `Step`s that represent each node in the pipeline DAG

Type:

Sequence[Step]

classmethod from_pipeline(pipeline: Pipeline)[source]#

Create a PipelineGraph object from the Pipeline object.

get_steps_in_sub_dag(current_step: Step, sub_dag_steps: Set[str] | None = None) Set[str][source]#

Get names of all steps (including current step) in the sub dag of current step.

Returns a set of step names in the sub dag.

is_cyclic() bool[source]#

Check if this pipeline graph is cyclic.

Returns true if it is cyclic, false otherwise.

class sagemaker.mlops.workflow.PipelineSchedule(name: str | None = None, enabled: bool | None = True, start_date: datetime | None = None, at: datetime | None = None, rate: tuple | None = None, cron: str | None = None)[source]#

Bases: Trigger

Pipeline Schedule trigger type used to create EventBridge Schedules for SageMaker Pipelines.

To create a pipeline schedule, specify a single type using the at, rate, or cron parameters. For more information about EventBridge syntax, see Schedule types on EventBridge Scheduler.

Parameters:
  • start_date (datetime) – The start date of the schedule. Default is time.now().

  • at (datetime) – An “At” EventBridge expression. Defaults to UTC timezone. Note that if you use datetime.now(), the result is a snapshot of your current local time. Eventbridge requires a time in UTC format. You can convert the result of datetime.now() to UTC by using datetime.utcnow() or datetime.now(tz=pytz.utc). For example, you can create a time two minutes from now with the expression datetime.now(tz=pytz.utc) + timedelta(0, 120).

  • rate (tuple) – A “Rate” EventBridge expression. Format is (value, unit).

  • cron (str) – A “Cron” EventBridge expression. Format is “minutes hours day-of-month month day-of-week year”.

  • name (str) – The schedule name. Default is None.

  • enabled (boolean) – If the schedule is enabled. Defaults to True.

at: datetime | None#
cron: str | None#
enabled: bool | None#
name: str | None#
rate: tuple | None#
resolve_schedule_expression() str[source]#

Resolve schedule expression

Format schedule expression for an EventBridge client call from the specified

at, rate, or cron parameter. After resolution, if there are any othererrors in the syntax, this will throw an expected ValidationException from EventBridge.

Returns:

Correctly string formatted schedule expression based on type.

Return type:

schedule_expression

start_date: datetime | None#
class sagemaker.mlops.workflow.ProcessingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, property_files: List[PropertyFile] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

ProcessingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_processing_job.

NOTE: The CreateProcessingJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeProcessingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

class sagemaker.mlops.workflow.QualityCheckConfig(baseline_dataset: str | PipelineVariable, dataset_format: dict, *, output_s3_uri: str | PipelineVariable | None = None, post_analytics_processor_script: str | None = None)[source]#

Bases: ABC

Quality Check Config.

baseline_dataset#

The path to the baseline_dataset file. This can be a local path or an S3 uri string

Type:

str or PipelineVariable

dataset_format#

The format of the baseline_dataset.

Type:

dict

output_s3_uri#

Desired S3 destination of the constraint_violations and statistics json files (default: None). If not specified an auto generated path will be used: “s3://<default_session_bucket>/model-monitor/baselining/<job_name>/results”

Type:

str or PipelineVariable

post_analytics_processor_script#

The path to the record post-analytics processor script (default: None). This can be a local path or an S3 uri string but CANNOT be any type of the PipelineVariable.

Type:

str

baseline_dataset: str | PipelineVariable#
dataset_format: dict#
output_s3_uri: str | PipelineVariable#
post_analytics_processor_script: str#
class sagemaker.mlops.workflow.QualityCheckStep(name: str, quality_check_config: QualityCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#

Bases: Step

QualityCheck step for workflow.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dict that is used to define the QualityCheck step.

property properties#

A Properties object representing the output parameters of the QualityCheck step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration etc.

class sagemaker.mlops.workflow.RetryPolicy(backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: Entity

RetryPolicy base class

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Get the request structure for workflow service calls.

validate_backoff_rate(_, value)[source]#

Validate the input back off rate type

validate_expire_after_mins(_, value)[source]#

Validate expire after mins

validate_interval_seconds(_, value)[source]#

Validate the input interval seconds

validate_max_attempts(_, value)[source]#

Validate the input max attempts

class sagemaker.mlops.workflow.SageMakerJobExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#

Bases: Enum

SageMaker Job ExceptionType enum.

CAPACITY_ERROR = 'SageMaker.CAPACITY_ERROR'#
INTERNAL_ERROR = 'SageMaker.JOB_INTERNAL_ERROR'#
RESOURCE_LIMIT = 'SageMaker.RESOURCE_LIMIT'#
class sagemaker.mlops.workflow.SageMakerJobStepRetryPolicy(exception_types: List[SageMakerJobExceptionTypeEnum] | None = None, failure_reason_types: List[SageMakerJobExceptionTypeEnum] | None = None, backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: RetryPolicy

RetryPolicy for exception thrown by SageMaker Job.

exception_types#

The SageMaker exception to match for this policy. The SageMaker exceptions captured here are the exceptions thrown by synchronously creating the job. For instance the resource limit exception.

Type:

List[SageMakerJobExceptionTypeEnum]

failure_reason_types#

the SageMaker failure reason types to match for this policy. The failure reason type is presented in FailureReason field of the Describe response, it indicates the runtime failure reason for a job.

Type:

List[SageMakerJobExceptionTypeEnum]

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
exception_type_list: List[SageMakerJobExceptionTypeEnum]#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for retry policy.

class sagemaker.mlops.workflow.SelectiveExecutionConfig(selected_steps: List[str], reference_latest_execution: bool = True, source_pipeline_execution_arn: str | None = None)[source]#

Bases: object

The selective execution configuration, which defines a subset of pipeline steps to run in

another SageMaker pipeline run.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Convert SelectiveExecutionConfig object to request dict.

class sagemaker.mlops.workflow.Step(name: str, display_name: str | None = None, description: str | None = None, step_type: StepTypeEnum = None, depends_on: List[str | Step | StepCollection | StepOutput] | None = None)[source]#

Bases: Entity

Pipeline Step for SageMaker Pipelines Workflows.

add_depends_on(step_names: List[str | Step | StepCollection | StepOutput])[source]#

Add Step names or Step instances to the current Step depends on list.

abstract property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to the particular Step service call.

property depends_on: List[str | Step | StepCollection | StepOutput] | None#

The list of steps the current Step depends on.

abstract property properties#

The properties of the particular Step.

property ref: Dict[str, str]#

Gets a reference dictionary for Step instances.

property step_only_arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments to this Step only.

Compound Steps such as the ConditionStep will have to override this method to return arguments pertaining to only that step.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for workflow service calls.

class sagemaker.mlops.workflow.StepCollection(name: str, steps: List[Step] = NOTHING, depends_on: List[str | Step | StepCollection | StepOutput] = None)[source]#

Bases: object

A wrapper of pipeline steps for workflow.

name#

The name of the StepCollection.

Type:

str

steps#

A list of steps.

Type:

List[Step]

depends_on#

The list of Step/StepCollection names or Step/StepCollection/StepOutput instances that the current Step depends on.

Type:

List[Union[str, Step, StepCollection, StepOutput]]

depends_on: List[str | Step | StepCollection | StepOutput]#
name: str#
property properties#

The properties of the particular StepCollection.

request_dicts() List[Dict[str, Any] | List[Dict[str, Any]]][source]#

Get the request structure for workflow service calls.

steps: List[Step]#
class sagemaker.mlops.workflow.StepExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#

Bases: Enum

Step ExceptionType enum.

SERVICE_FAULT = 'Step.SERVICE_FAULT'#
THROTTLING = 'Step.THROTTLING'#
class sagemaker.mlops.workflow.StepRetryPolicy(exception_types: List[StepExceptionTypeEnum], backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#

Bases: RetryPolicy

RetryPolicy for a retryable step. The pipeline service will retry

sagemaker.workflow.retry.StepRetryExceptionTypeEnum.SERVICE_FAULT and sagemaker.workflow.retry.StepRetryExceptionTypeEnum.THROTTLING regardless of pipeline step type by default. However, for step defined as retryable, you can override them by specifying a StepRetryPolicy.

exception_types#

the exception types to match for this policy

Type:

List[StepExceptionTypeEnum]

backoff_rate#

The multiplier by which the retry interval increases during each attempt (default: 2.0)

Type:

float

interval_seconds#

An integer that represents the number of seconds before the first retry attempt (default: 1)

Type:

int

max_attempts#

A positive integer that represents the maximum number of retry attempts. (default: None)

Type:

int

expire_after_mins#

A positive integer that represents the maximum minute to expire any further retry attempt (default: None)

Type:

int

backoff_rate: float#
expire_after_mins: int#
interval_seconds: int#
max_attempts: int#
to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Gets the request structure for retry policy.

class sagemaker.mlops.workflow.StepTypeEnum(value)[source]#

Bases: Enum

Enum of Step types.

AUTOML = 'AutoML'#
CALLBACK = 'Callback'#
CLARIFY_CHECK = 'ClarifyCheck'#
CONDITION = 'Condition'#
CREATE_MODEL = 'Model'#
EMR = 'EMR'#
EMR_SERVERLESS = 'EMRServerless'#
FAIL = 'Fail'#
LAMBDA = 'Lambda'#
PROCESSING = 'Processing'#
QUALITY_CHECK = 'QualityCheck'#
REGISTER_MODEL = 'RegisterModel'#
TRAINING = 'Training'#
TRANSFORM = 'Transform'#
TUNING = 'Tuning'#
class sagemaker.mlops.workflow.TrainingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TrainingStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_training_job.

NOTE: The CreateTrainingJob request is not quite the args list that workflow needs.

property properties#

A Properties object representing the DescribeTrainingJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the request dictionary with cache configuration.

class sagemaker.mlops.workflow.TransformStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TransformStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_transform_job.

NOTE: The CreateTransformJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.

property properties#

A Properties object representing the DescribeTransformJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

class sagemaker.mlops.workflow.Trigger(name: str | None = None, enabled: bool | None = True)[source]#

Bases: object

Abstract class representing a Pipeline Trigger

name#

The name of the trigger, default to pipeline_name.

Type:

str

enabled#

The state of the schedule, default True resolves to ‘ENABLED’.

Type:

boolean

enabled: bool | None#
name: str | None#
resolve_trigger_name(pipeline_name: str) str[source]#

Resolve the schedule name given a parent pipeline.

Parameters:

pipeline_name (str) – Parent pipeline name

Returns:

Resolved schedule name.

Return type:

str

resolve_trigger_state() str[source]#

Helper method for Enabled/Disabled Resolution on Trigger States

Returns:

ENABLED/DISABLED string literal

Return type:

(str)

class sagemaker.mlops.workflow.TuningStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#

Bases: ConfigurableRetryStep

TuningStep for SageMaker Pipelines Workflows.

property arguments: Dict[str, Any] | List[Dict[str, Any]]#

The arguments dictionary that is used to call create_hyper_parameter_tuning_job.

NOTE: The CreateHyperParameterTuningJob request is not quite the

args list that workflow needs.

get_top_model_s3_uri(top_k: int, s3_bucket: str, prefix: str = '') Join[source]#

Get the model artifact S3 URI from the top performing training jobs.

Parameters:
  • top_k (int) – The index of the top performing training job tuning step stores up to 50 top performing training jobs. A valid top_k value is from 0 to 49. The best training job model is at index 0.

  • s3_bucket (str) – The S3 bucket to store the training job output artifact.

  • prefix (str) – The S3 key prefix to store the training job output artifact.

property properties#

A Properties object

A Properties object representing DescribeHyperParameterTuningJobResponse and ListTrainingJobsForHyperParameterTuningJobResponse data model.

to_request() Dict[str, Any] | List[Dict[str, Any]][source]#

Updates the dictionary with cache configuration.

Local Development#

Local pipeline execution for SageMaker MLOps.

class sagemaker.mlops.local.LocalPipelineSession(*args, **kwargs)[source]#

Bases: LocalSession

Extends LocalSession with pipeline execution capabilities.

This class provides local pipeline execution functionality that was previously in LocalSession. It’s now in the MLOps package since pipeline orchestration is an MLOps concern.

Usage:

from sagemaker.mlops.local import LocalPipelineSession from sagemaker.mlops.workflow import Pipeline

session = LocalPipelineSession() session.create_pipeline(pipeline, “My pipeline”)

create_pipeline(pipeline, pipeline_description, **kwargs)[source]#

Create a local pipeline.

Parameters:
  • pipeline (Pipeline) – Pipeline object

  • pipeline_description (str) – Description of the pipeline

Returns:

Pipeline metadata (PipelineArn)

delete_pipeline(PipelineName)[source]#

Delete the local pipeline.

Parameters:

PipelineName (str) – Name of the pipeline

Returns:

Pipeline metadata (PipelineArn)

describe_pipeline(PipelineName)[source]#

Describe the pipeline.

Parameters:

PipelineName (str) – Name of the pipeline

Returns:

Pipeline metadata (PipelineArn, PipelineDefinition, LastModifiedTime, etc)

start_pipeline_execution(PipelineName, **kwargs)[source]#

Start the pipeline.

Parameters:

PipelineName (str) – Name of the pipeline

Returns:

_LocalPipelineExecution object

update_pipeline(pipeline, pipeline_description, **kwargs)[source]#

Update a local pipeline.

Parameters:
  • pipeline (Pipeline) – Pipeline object

  • pipeline_description (str) – Description of the pipeline

Returns:

Pipeline metadata (PipelineArn)

Feature Store#

SageMaker FeatureStore V3 - powered by sagemaker-core.

class sagemaker.mlops.feature_store.AthenaQuery(catalog: str, database: str, table_name: str, sagemaker_session: Session)[source]#

Bases: object

Class to manage querying of feature store data with AWS Athena.

This class instantiates a AthenaQuery object that is used to retrieve data from feature store via standard SQL queries.

catalog#

name of the data catalog.

Type:

str

database#

name of the database.

Type:

str

table_name#

name of the table.

Type:

str

sagemaker_session#

instance of the Session class to perform boto calls.

Type:

Session

as_dataframe(**kwargs) DataFrame[source]#

Download the result of the current query and load it into a DataFrame.

Parameters:

**kwargs (object) – key arguments used for the method pandas.read_csv to be able to have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

Returns:

A pandas DataFrame contains the query result.

catalog: str#
database: str#
get_query_execution() Dict[str, Any][source]#

Get execution status of the current query.

Returns:

Response dict from Athena.

run(query_string: str, output_location: str, kms_key: str = None, workgroup: str = None) str[source]#

Execute a SQL query given a query string, output location and kms key.

This method executes the SQL query using Athena and outputs the results to output_location and returns the execution id of the query.

Parameters:
  • query_string – SQL query string.

  • output_location – S3 URI of the query result.

  • kms_key – KMS key id. If set, will be used to encrypt the query result file.

  • workgroup (str) – The name of the workgroup in which the query is being started.

Returns:

Execution id of the query.

sagemaker_session: Session#
table_name: str#
wait()[source]#

Wait for the current query to finish.

class sagemaker.mlops.feature_store.CollectionTypeEnum(value)[source]#

Bases: Enum

Collection types: List, Set, or Vector.

LIST = 'List'#
SET = 'Set'#
VECTOR = 'Vector'#
class sagemaker.mlops.feature_store.DataCatalogConfig(*, table_name: str | PipelineVariable, catalog: str | PipelineVariable, database: str | PipelineVariable)[source]#

Bases: Base

The meta data of the Glue table which serves as data catalog for the OfflineStore.

table_name#
Type:

The name of the Glue table.

catalog#
Type:

The name of the Glue table catalog.

database#
Type:

The name of the Glue table database.

catalog: str | PipelineVariable#
database: str | PipelineVariable#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

table_name: str | PipelineVariable#
class sagemaker.mlops.feature_store.DatasetBuilder(_sagemaker_session: Session, _base: FeatureGroup | DataFrame, _output_path: str, _record_identifier_feature_name: str | None = None, _event_time_identifier_feature_name: str | None = None, _included_feature_names: List[str] | None = None, _kms_key_id: str | None = None, _event_time_identifier_feature_type: FeatureTypeEnum | None = None)[source]#

Bases: object

DatasetBuilder definition.

This class instantiates a DatasetBuilder object that comprises a base, a list of feature names, an output path and a KMS key ID.

_sagemaker_session#

Session instance to perform boto calls.

Type:

Session

_base#

A base which can be either a FeatureGroup or a pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.

Type:

Union[FeatureGroup, DataFrame]

_output_path#

An S3 URI which stores the output .csv file.

Type:

str

_record_identifier_feature_name#

A string representing the record identifier feature if base is a DataFrame (default: None).

Type:

str

_event_time_identifier_feature_name#

A string representing the event time identifier feature if base is a DataFrame (default: None).

Type:

str

_included_feature_names#

A list of strings representing features to be included in the output. If not set, all features will be included in the output. (default: None).

Type:

List[str]

_kms_key_id#

A KMS key id. If set, will be used to encrypt the result file (default: None).

Type:

str

_point_in_time_accurate_join#

A boolean representing if point-in-time join is applied to the resulting dataframe when calling “to_dataframe”. When set to True, users can retrieve data using “row-level time travel” according to the event times provided to the DatasetBuilder. This requires that the entity dataframe with event times is submitted as the base in the constructor (default: False).

Type:

bool

_include_duplicated_records#

A boolean representing whether the resulting dataframe when calling “to_dataframe” should include duplicated records (default: False).

Type:

bool

_include_deleted_records#

A boolean representing whether the resulting dataframe when calling “to_dataframe” should include deleted records (default: False).

Type:

bool

_number_of_recent_records#

An integer representing how many records will be returned for each record identifier (default: 1).

Type:

int

_number_of_records#

An integer representing the number of records that should be returned in the resulting dataframe when calling “to_dataframe” (default: None).

Type:

int

_write_time_ending_timestamp#

A datetime that represents the latest write time for a record to be included in the resulting dataset. Records with a newer write time will be omitted from the resulting dataset. (default: None).

Type:

datetime.datetime

_event_time_starting_timestamp#

A datetime that represents the earliest event time for a record to be included in the resulting dataset. Records with an older event time will be omitted from the resulting dataset. (default: None).

Type:

datetime.datetime

_event_time_ending_timestamp#

A datetime that represents the latest event time for a record to be included in the resulting dataset. Records with a newer event time will be omitted from the resulting dataset. (default: None).

Type:

datetime.datetime

_feature_groups_to_be_merged#

A list of FeatureGroupToBeMerged which will be joined to base (default: []).

Type:

List[FeatureGroupToBeMerged]

_event_time_identifier_feature_type#

A FeatureTypeEnum representing the type of event time identifier feature (default: None).

Type:

FeatureTypeEnum

as_of(timestamp: datetime) DatasetBuilder[source]#

Set write_time_ending_timestamp field with provided input.

Parameters:

timestamp (datetime.datetime) – A datetime that all records’ write time in dataset will be before it.

Returns:

This DatasetBuilder object.

classmethod create(base: FeatureGroup | DataFrame, output_path: str, session: Session, record_identifier_feature_name: str | None = None, event_time_identifier_feature_name: str | None = None, included_feature_names: List[str] | None = None, kms_key_id: str | None = None) DatasetBuilder[source]#

Create a DatasetBuilder for generating a Dataset.

Parameters:
  • base – A FeatureGroup or DataFrame to use as the base.

  • output_path – S3 URI for output.

  • session – SageMaker session.

  • record_identifier_feature_name – Required if base is DataFrame.

  • event_time_identifier_feature_name – Required if base is DataFrame.

  • included_feature_names – Features to include in output.

  • kms_key_id – KMS key for encryption.

Returns:

DatasetBuilder instance.

include_deleted_records() DatasetBuilder[source]#

Include deleted records in dataset.

Returns:

This DatasetBuilder object.

include_duplicated_records() DatasetBuilder[source]#

Include duplicated records in dataset.

Returns:

This DatasetBuilder object.

point_in_time_accurate_join() DatasetBuilder[source]#

Enable point-in-time accurate join.

Returns:

This DatasetBuilder object.

to_csv_file() tuple[str, str][source]#

Get query string and result in .csv format file.

Returns:

A tuple containing:
  • str: The S3 path of the .csv file

  • str: The query string executed

Return type:

tuple

Note

This method returns a tuple (csv_path, query_string). To get just the CSV path: csv_path, _ = builder.to_csv_file()

to_dataframe() tuple[DataFrame, str][source]#

Get query string and result in pandas.DataFrame.

Returns:

A tuple containing:
  • pd.DataFrame: The pandas DataFrame object

  • str: The query string executed

Return type:

tuple

Note

This method returns a tuple (dataframe, query_string). To get just the DataFrame: df, _ = builder.to_dataframe()

with_event_time_range(starting_timestamp: datetime | None = None, ending_timestamp: datetime | None = None) DatasetBuilder[source]#

Set event_time_starting_timestamp and event_time_ending_timestamp with provided inputs.

Parameters:
  • starting_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be after it (default: None).

  • ending_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be before it (default: None).

Returns:

This DatasetBuilder object.

with_feature_group(feature_group: FeatureGroup, target_feature_name_in_base: str | None = None, included_feature_names: List[str] | None = None, feature_name_in_target: str | None = None, join_comparator: JoinComparatorEnum = JoinComparatorEnum.EQUALS, join_type: JoinTypeEnum = JoinTypeEnum.INNER_JOIN) DatasetBuilder[source]#

Join FeatureGroup with base.

Parameters:
  • feature_group (FeatureGroup) – A target FeatureGroup which will be joined to base.

  • target_feature_name_in_base (str) – A string representing the feature name in base which will be used as a join key (default: None).

  • included_feature_names (List[str]) – A list of strings representing features to be included in the output (default: None).

  • feature_name_in_target (str) – A string representing the feature name in the target feature group that will be compared to the target feature in the base feature group. If None is provided, the record identifier feature will be used in the SQL join. (default: None).

  • join_comparator (JoinComparatorEnum) – A JoinComparatorEnum representing the comparator used when joining the target feature in the base feature group and the feature in the target feature group. (default: JoinComparatorEnum.EQUALS).

  • join_type (JoinTypeEnum) – A JoinTypeEnum representing the type of join between the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).

Returns:

This DatasetBuilder object.

with_number_of_recent_records_by_record_identifier(n: int) DatasetBuilder[source]#

Set number_of_recent_records field with provided input.

Parameters:

n (int) – An int that how many recent records will be returned for each record identifier.

Returns:

This DatasetBuilder object.

with_number_of_records_from_query_results(n: int) DatasetBuilder[source]#

Set number_of_records field with provided input.

Parameters:

n (int) – An int that how many records will be returned.

Returns:

This DatasetBuilder object.

class sagemaker.mlops.feature_store.DeletionModeEnum(value)[source]#

Bases: Enum

Deletion modes for delete_record.

HARD_DELETE = 'HardDelete'#
SOFT_DELETE = 'SoftDelete'#
class sagemaker.mlops.feature_store.ExpirationTimeResponseEnum(value)[source]#

Bases: Enum

ExpiresAt response toggle.

DISABLED = 'Disabled'#
ENABLED = 'Enabled'#
class sagemaker.mlops.feature_store.FeatureDefinition(*, feature_name: str | PipelineVariable, feature_type: str | PipelineVariable, collection_type: str | PipelineVariable | None = Unassigned(), collection_config: CollectionConfig | None = Unassigned())[source]#

Bases: Base

A list of features. You must include FeatureName and FeatureType. Valid feature FeatureTypes are Integral, Fractional and String.

feature_name#
Type:

The name of a feature. The type must be a string. FeatureName cannot be any of the following: is_deleted, write_time, api_invocation_time. The name: Must start with an alphanumeric character. Can only include alphanumeric characters, underscores, and hyphens. Spaces are not allowed.

feature_type#
Type:

The value type of a feature. Valid values are Integral, Fractional, or String.

collection_type#
Type:

A grouping of elements where each element within the collection must have the same feature type (String, Integral, or Fractional). List: An ordered collection of elements. Set: An unordered collection of unique elements. Vector: A specialized list that represents a fixed-size array of elements. The vector dimension is determined by you. Must have elements with fractional feature types.

collection_config#
Type:

Configuration for your collection.

collection_config: CollectionConfig | None#
collection_type: str | PipelineVariable | None#
feature_name: str | PipelineVariable#
feature_type: str | PipelineVariable#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class sagemaker.mlops.feature_store.FeatureGroup(*, feature_group_name: str | PipelineVariable, feature_group_arn: str | PipelineVariable | None = Unassigned(), record_identifier_feature_name: str | PipelineVariable | None = Unassigned(), event_time_feature_name: str | PipelineVariable | None = Unassigned(), feature_definitions: List[FeatureDefinition] | None = Unassigned(), creation_time: datetime | None = Unassigned(), last_modified_time: datetime | None = Unassigned(), online_store_config: OnlineStoreConfig | None = Unassigned(), offline_store_config: OfflineStoreConfig | None = Unassigned(), throughput_config: ThroughputConfigDescription | None = Unassigned(), role_arn: str | PipelineVariable | None = Unassigned(), feature_group_status: str | PipelineVariable | None = Unassigned(), offline_store_status: OfflineStoreStatus | None = Unassigned(), last_update_status: LastUpdateStatus | None = Unassigned(), failure_reason: str | PipelineVariable | None = Unassigned(), description: str | PipelineVariable | None = Unassigned(), next_token: str | PipelineVariable | None = Unassigned(), online_store_replicas: List[OnlineStoreReplica] | None = Unassigned(), online_store_read_write_type: str | PipelineVariable | None = Unassigned(), online_store_total_size_bytes: int | None = Unassigned(), online_store_total_item_count: int | None = Unassigned(), created_by: UserContext | None = Unassigned(), last_modified_by: UserContext | None = Unassigned())[source]#

Bases: Base

Class representing resource FeatureGroup

feature_group_arn#

The Amazon Resource Name (ARN) of the FeatureGroup.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

feature_group_name#

he name of the FeatureGroup.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable

record_identifier_feature_name#

The name of the Feature used for RecordIdentifier, whose value uniquely identifies a record stored in the feature store.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

event_time_feature_name#

The name of the feature that stores the EventTime of a Record in a FeatureGroup. An EventTime is a point in time when a new event occurs that corresponds to the creation or update of a Record in a FeatureGroup. All Records in the FeatureGroup have a corresponding EventTime.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

feature_definitions#

A list of the Features in the FeatureGroup. Each feature is defined by a FeatureName and FeatureType.

Type:

List[sagemaker.core.shapes.shapes.FeatureDefinition] | None

creation_time#

A timestamp indicating when SageMaker created the FeatureGroup.

Type:

datetime.datetime | None

next_token#

A token to resume pagination of the list of Features (FeatureDefinitions).

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

last_modified_time#

A timestamp indicating when the feature group was last updated.

Type:

datetime.datetime | None

online_store_config#

The configuration for the OnlineStore.

Type:

sagemaker.core.shapes.shapes.OnlineStoreConfig | None

offline_store_config#

The configuration of the offline store. It includes the following configurations: Amazon S3 location of the offline store. Configuration of the Glue data catalog. Table format of the offline store. Option to disable the automatic creation of a Glue table for the offline store. Encryption configuration.

Type:

sagemaker.core.shapes.shapes.OfflineStoreConfig | None

throughput_config#
Type:

sagemaker.core.shapes.shapes.ThroughputConfigDescription | None

role_arn#

The Amazon Resource Name (ARN) of the IAM execution role used to persist data into the OfflineStore if an OfflineStoreConfig is provided.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

feature_group_status#

The status of the feature group.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

offline_store_status#

The status of the OfflineStore. Notifies you if replicating data into the OfflineStore has failed. Returns either: Active or Blocked

Type:

sagemaker.core.shapes.shapes.OfflineStoreStatus | None

last_update_status#

A value indicating whether the update made to the feature group was successful.

Type:

sagemaker.core.shapes.shapes.LastUpdateStatus | None

failure_reason#

The reason that the FeatureGroup failed to be replicated in the OfflineStore. This is failure can occur because: The FeatureGroup could not be created in the OfflineStore. The FeatureGroup could not be deleted from the OfflineStore.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

description#

A free form description of the feature group.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

online_store_replicas#
Type:

List[sagemaker.core.shapes.shapes.OnlineStoreReplica] | None

online_store_read_write_type#
Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

online_store_total_size_bytes#

The size of the OnlineStore in bytes.

Type:

int | None

online_store_total_item_count#
Type:

int | None

created_by#
Type:

sagemaker.core.shapes.shapes.UserContext | None

last_modified_by#
Type:

sagemaker.core.shapes.shapes.UserContext | None

batch_get_record(identifiers: List[BatchGetRecordIdentifier], expiration_time_response: str | PipelineVariable | None = Unassigned(), session: Session | None = None, region: str | None = None) BatchGetRecordResponse | None[source]#

Retrieves a batch of Records from a FeatureGroup.

Parameters:
  • identifiers – A list containing the name or Amazon Resource Name (ARN) of the FeatureGroup, the list of names of Features to be retrieved, and the corresponding RecordIdentifier values as strings.

  • expiration_time_response – Parameter to request ExpiresAt in response. If Enabled, BatchGetRecord will return the value of ExpiresAt, if it is not null. If Disabled and null, BatchGetRecord will return null.

  • session – Boto3 session.

  • region – Region name.

Returns:

BatchGetRecordResponse

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • AccessForbidden – You do not have permission to perform an action.

  • InternalFailure – An internal failure occurred. Try your request again. If the problem persists, contact Amazon Web Services customer support.

  • ServiceUnavailable – The service is currently unavailable.

  • ValidationError – There was an error validating your request.

classmethod create(feature_group_name: str | PipelineVariable, record_identifier_feature_name: str | PipelineVariable, event_time_feature_name: str | PipelineVariable, feature_definitions: List[FeatureDefinition], online_store_config: OnlineStoreConfig | None = Unassigned(), offline_store_config: OfflineStoreConfig | None = Unassigned(), throughput_config: ThroughputConfig | None = Unassigned(), role_arn: str | PipelineVariable | None = Unassigned(), description: str | PipelineVariable | None = Unassigned(), tags: List[Tag] | None = Unassigned(), use_pre_prod_offline_store_replicator_lambda: bool | None = Unassigned(), session: Session | None = None, region: str | PipelineVariable | None = None) FeatureGroup | None[source]#

Create a FeatureGroup resource

Parameters:
  • feature_group_name – The name of the FeatureGroup. The name must be unique within an Amazon Web Services Region in an Amazon Web Services account. The name: Must start with an alphanumeric character. Can only include alphanumeric characters, underscores, and hyphens. Spaces are not allowed.

  • record_identifier_feature_name – The name of the Feature whose value uniquely identifies a Record defined in the FeatureStore. Only the latest record per identifier value will be stored in the OnlineStore. RecordIdentifierFeatureName must be one of feature definitions’ names. You use the RecordIdentifierFeatureName to access data in a FeatureStore. This name: Must start with an alphanumeric character. Can only contains alphanumeric characters, hyphens, underscores. Spaces are not allowed.

  • event_time_feature_name – The name of the feature that stores the EventTime of a Record in a FeatureGroup. An EventTime is a point in time when a new event occurs that corresponds to the creation or update of a Record in a FeatureGroup. All Records in the FeatureGroup must have a corresponding EventTime. An EventTime can be a String or Fractional. Fractional: EventTime feature values must be a Unix timestamp in seconds. String: EventTime feature values must be an ISO-8601 string in the format. The following formats are supported yyyy-MM-dd’T’HH:mm:ssZ and yyyy-MM-dd’T’HH:mm:ss.SSSZ where yyyy, MM, and dd represent the year, month, and day respectively and HH, mm, ss, and if applicable, SSS represent the hour, month, second and milliseconds respsectively. ‘T’ and Z are constants.

  • feature_definitions – A list of Feature names and types. Name and Type is compulsory per Feature. Valid feature FeatureTypes are Integral, Fractional and String. FeatureNames cannot be any of the following: is_deleted, write_time, api_invocation_time You can create up to 2,500 FeatureDefinitions per FeatureGroup.

  • online_store_config – You can turn the OnlineStore on or off by specifying True for the EnableOnlineStore flag in OnlineStoreConfig. You can also include an Amazon Web Services KMS key ID (KMSKeyId) for at-rest encryption of the OnlineStore. The default value is False.

  • offline_store_config – Use this to configure an OfflineFeatureStore. This parameter allows you to specify: The Amazon Simple Storage Service (Amazon S3) location of an OfflineStore. A configuration for an Amazon Web Services Glue or Amazon Web Services Hive data catalog. An KMS encryption key to encrypt the Amazon S3 location used for OfflineStore. If KMS encryption key is not specified, by default we encrypt all data at rest using Amazon Web Services KMS key. By defining your bucket-level key for SSE, you can reduce Amazon Web Services KMS requests costs by up to 99 percent. Format for the offline store table. Supported formats are Glue (Default) and Apache Iceberg. To learn more about this parameter, see OfflineStoreConfig.

  • throughput_config

  • role_arn – The Amazon Resource Name (ARN) of the IAM execution role used to persist data into the OfflineStore if an OfflineStoreConfig is provided.

  • description – A free-form description of a FeatureGroup.

  • tags – Tags used to identify Features in each FeatureGroup.

  • use_pre_prod_offline_store_replicator_lambda

  • session – Boto3 session.

  • region – Region name.

Returns:

The FeatureGroup resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceInUse – Resource being accessed is in use.

  • ResourceLimitExceeded – You have exceeded an SageMaker resource limit. For example, you might have too many training jobs created.

  • ConfigSchemaValidationError – Raised when a configuration file does not adhere to the schema

  • LocalConfigNotFoundError – Raised when a configuration file is not found in local file system

  • S3ConfigNotFoundError – Raised when a configuration file is not found in S3

created_by: UserContext | None#
creation_time: datetime | None#
delete() None[source]#

Delete a FeatureGroup resource

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

delete_record(record_identifier_value_as_string: str | PipelineVariable, event_time: str | PipelineVariable, target_stores: List[str | PipelineVariable] | None = Unassigned(), deletion_mode: str | PipelineVariable | None = Unassigned(), session: Session | None = None, region: str | None = None) None[source]#

Deletes a Record from a FeatureGroup in the OnlineStore.

Parameters:
  • record_identifier_value_as_string – The value for the RecordIdentifier that uniquely identifies the record, in string format.

  • event_time – Timestamp indicating when the deletion event occurred. EventTime can be used to query data at a certain point in time.

  • target_stores – A list of stores from which you’re deleting the record. By default, Feature Store deletes the record from all of the stores that you’re using for the FeatureGroup.

  • deletion_mode – The name of the deletion mode for deleting the record. By default, the deletion mode is set to SoftDelete.

  • session – Boto3 session.

  • region – Region name.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • AccessForbidden – You do not have permission to perform an action.

  • InternalFailure – An internal failure occurred. Try your request again. If the problem persists, contact Amazon Web Services customer support.

  • ServiceUnavailable – The service is currently unavailable.

  • ValidationError – There was an error validating your request.

description: str | PipelineVariable | None#
event_time_feature_name: str | PipelineVariable | None#
failure_reason: str | PipelineVariable | None#
feature_definitions: List[FeatureDefinition] | None#
feature_group_arn: str | PipelineVariable | None#
feature_group_name: str | PipelineVariable#
feature_group_status: str | PipelineVariable | None#
classmethod get(feature_group_name: str | PipelineVariable, next_token: str | PipelineVariable | None = Unassigned(), session: Session | None = None, region: str | PipelineVariable | None = None) FeatureGroup | None[source]#

Get a FeatureGroup resource

Parameters:
  • feature_group_name – The name or Amazon Resource Name (ARN) of the FeatureGroup you want described.

  • next_token – A token to resume pagination of the list of Features (FeatureDefinitions). 2,500 Features are returned by default.

  • session – Boto3 session.

  • region – Region name.

Returns:

The FeatureGroup resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

classmethod get_all(name_contains: str | PipelineVariable | None = Unassigned(), feature_group_status_equals: str | PipelineVariable | None = Unassigned(), offline_store_status_equals: str | PipelineVariable | None = Unassigned(), creation_time_after: datetime | None = Unassigned(), creation_time_before: datetime | None = Unassigned(), sort_order: str | PipelineVariable | None = Unassigned(), sort_by: str | PipelineVariable | None = Unassigned(), session: Session | None = None, region: str | PipelineVariable | None = None) ResourceIterator[FeatureGroup][source]#

Get all FeatureGroup resources

Parameters:
  • name_contains – A string that partially matches one or more FeatureGroups names. Filters FeatureGroups by name.

  • feature_group_status_equals – A FeatureGroup status. Filters by FeatureGroup status.

  • offline_store_status_equals – An OfflineStore status. Filters by OfflineStore status.

  • creation_time_after – Use this parameter to search for FeatureGroupss created after a specific date and time.

  • creation_time_before – Use this parameter to search for FeatureGroupss created before a specific date and time.

  • sort_order – The order in which feature groups are listed.

  • sort_by – The value on which the feature group list is sorted.

  • max_results – The maximum number of results returned by ListFeatureGroups.

  • next_token – A token to resume pagination of ListFeatureGroups results.

  • session – Boto3 session.

  • region – Region name.

Returns:

Iterator for listed FeatureGroup resources.

Raises:

botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

get_name() str[source]#
get_record(record_identifier_value_as_string: str | PipelineVariable, feature_names: List[str | PipelineVariable] | None = Unassigned(), expiration_time_response: str | PipelineVariable | None = Unassigned(), session: Session | None = None, region: str | None = None) GetRecordResponse | None[source]#

Use for OnlineStore serving from a FeatureStore.

Parameters:
  • record_identifier_value_as_string – The value that corresponds to RecordIdentifier type and uniquely identifies the record in the FeatureGroup.

  • feature_names – List of names of Features to be retrieved. If not specified, the latest value for all the Features are returned.

  • expiration_time_response – Parameter to request ExpiresAt in response. If Enabled, GetRecord will return the value of ExpiresAt, if it is not null. If Disabled and null, GetRecord will return null.

  • session – Boto3 session.

  • region – Region name.

Returns:

GetRecordResponse

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • AccessForbidden – You do not have permission to perform an action.

  • InternalFailure – An internal failure occurred. Try your request again. If the problem persists, contact Amazon Web Services customer support.

  • ResourceNotFound – Resource being access is not found.

  • ServiceUnavailable – The service is currently unavailable.

  • ValidationError – There was an error validating your request.

last_modified_by: UserContext | None#
last_modified_time: datetime | None#
last_update_status: LastUpdateStatus | None#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

next_token: str | PipelineVariable | None#
offline_store_config: OfflineStoreConfig | None#
offline_store_status: OfflineStoreStatus | None#
online_store_config: OnlineStoreConfig | None#
online_store_read_write_type: str | PipelineVariable | None#
online_store_replicas: List[OnlineStoreReplica] | None#
online_store_total_item_count: int | None#
online_store_total_size_bytes: int | None#
populate_inputs_decorator()[source]#
put_record(record: List[FeatureValue], target_stores: List[str | PipelineVariable] | None = Unassigned(), ttl_duration: TtlDuration | None = Unassigned(), session: Session | None = None, region: str | None = None) None[source]#

The PutRecord API is used to ingest a list of Records into your feature group.

Parameters:
  • record – List of FeatureValues to be inserted. This will be a full over-write. If you only want to update few of the feature values, do the following: Use GetRecord to retrieve the latest record. Update the record returned from GetRecord. Use PutRecord to update feature values.

  • target_stores – A list of stores to which you’re adding the record. By default, Feature Store adds the record to all of the stores that you’re using for the FeatureGroup.

  • ttl_duration – Time to live duration, where the record is hard deleted after the expiration time is reached; ExpiresAt = EventTime + TtlDuration. For information on HardDelete, see the DeleteRecord API in the Amazon SageMaker API Reference guide.

  • session – Boto3 session.

  • region – Region name.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • AccessForbidden – You do not have permission to perform an action.

  • InternalFailure – An internal failure occurred. Try your request again. If the problem persists, contact Amazon Web Services customer support.

  • ServiceUnavailable – The service is currently unavailable.

  • ValidationError – There was an error validating your request.

record_identifier_feature_name: str | PipelineVariable | None#
refresh() FeatureGroup | None[source]#

Refresh a FeatureGroup resource

Returns:

The FeatureGroup resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

role_arn: str | PipelineVariable | None#
throughput_config: ThroughputConfigDescription | None#
update(add_online_store_replica: AddOnlineStoreReplicaAction | None = Unassigned(), feature_additions: List[FeatureDefinition] | None = Unassigned(), online_store_config: OnlineStoreConfigUpdate | None = Unassigned(), description: str | PipelineVariable | None = Unassigned(), throughput_config: ThroughputConfigUpdate | None = Unassigned()) FeatureGroup | None[source]#

Update a FeatureGroup resource

Parameters:
  • add_online_store_replica

  • feature_additions – Updates the feature group. Updating a feature group is an asynchronous operation. When you get an HTTP 200 response, you’ve made a valid request. It takes some time after you’ve made a valid request for Feature Store to update the feature group.

Returns:

The FeatureGroup resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceLimitExceeded – You have exceeded an SageMaker resource limit. For example, you might have too many training jobs created.

  • ResourceNotFound – Resource being access is not found.

wait_for_delete(poll: int = 5, timeout: int | None = None) None[source]#

Wait for a FeatureGroup resource to be deleted.

Parameters:
  • poll – The number of seconds to wait between each poll.

  • timeout – The maximum number of seconds to wait before timing out.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • TimeoutExceededError – If the resource does not reach a terminal state before the timeout.

  • DeleteFailedStatusError – If the resource reaches a failed state.

  • WaiterError – Raised when an error occurs while waiting.

wait_for_status(target_status: Literal['Creating', 'Created', 'CreateFailed', 'Deleting', 'DeleteFailed'], poll: int = 5, timeout: int | None = None) None[source]#

Wait for a FeatureGroup resource to reach certain status.

Parameters:
  • target_status – The status to wait for.

  • poll – The number of seconds to wait between each poll.

  • timeout – The maximum number of seconds to wait before timing out.

Raises:
class sagemaker.mlops.feature_store.FeatureGroupToBeMerged(features: List[str], included_feature_names: List[str], projected_feature_names: List[str], catalog: str, database: str, table_name: str, record_identifier_feature_name: str, event_time_identifier_feature: FeatureDefinition, target_feature_name_in_base: str | None = None, table_type: TableType | None = None, feature_name_in_target: str | None = None, join_comparator: JoinComparatorEnum = JoinComparatorEnum.EQUALS, join_type: JoinTypeEnum = JoinTypeEnum.INNER_JOIN)[source]#

Bases: object

FeatureGroup metadata which will be used for SQL join.

This class instantiates a FeatureGroupToBeMerged object that comprises a list of feature names, a list of feature names which will be included in SQL query, a database, an Athena table name, a feature name of record identifier, a feature name of event time identifier and a feature name of base which is the target join key.

features#

A list of strings representing feature names of this FeatureGroup.

Type:

List[str]

included_feature_names#

A list of strings representing features to be included in the SQL join.

Type:

List[str]

projected_feature_names#

A list of strings representing features to be included for final projection in output.

Type:

List[str]

catalog#

A string representing the catalog.

Type:

str

database#

A string representing the database.

Type:

str

table_name#

A string representing the Athena table name of this FeatureGroup.

Type:

str

record_identifier_feature_name#

A string representing the record identifier feature.

Type:

str

event_time_identifier_feature#

A FeatureDefinition representing the event time identifier feature.

Type:

FeatureDefinition

target_feature_name_in_base#

A string representing the feature name in base which will be used as target join key (default: None).

Type:

str

table_type#

A TableType representing the type of table if it is Feature Group or Panda Data Frame (default: None).

Type:

TableType

feature_name_in_target#

A string representing the feature name in the target feature group that will be compared to the target feature in the base feature group. If None is provided, the record identifier feature will be used in the SQL join. (default: None).

Type:

str

join_comparator#

A JoinComparatorEnum representing the comparator used when joining the target feature in the base feature group and the feature in the target feature group. (default: JoinComparatorEnum.EQUALS).

Type:

JoinComparatorEnum

join_type#

A JoinTypeEnum representing the type of join between the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).

Type:

JoinTypeEnum

catalog: str#
database: str#
event_time_identifier_feature: FeatureDefinition#
feature_name_in_target: str = None#
features: List[str]#
included_feature_names: List[str]#
join_comparator: JoinComparatorEnum = '='#
join_type: JoinTypeEnum = 'JOIN'#
projected_feature_names: List[str]#
record_identifier_feature_name: str#
table_name: str#
table_type: TableType = None#
target_feature_name_in_base: str = None#
class sagemaker.mlops.feature_store.FeatureMetadata(*, feature_group_name: str | PipelineVariable, feature_name: str | PipelineVariable, feature_group_arn: str | PipelineVariable | None = Unassigned(), feature_identifier: str | PipelineVariable | None = Unassigned(), feature_type: str | PipelineVariable | None = Unassigned(), creation_time: datetime | None = Unassigned(), last_modified_time: datetime | None = Unassigned(), description: str | PipelineVariable | None = Unassigned(), parameters: List[FeatureParameter] | None = Unassigned())[source]#

Bases: Base

Class representing resource FeatureMetadata

feature_group_arn#

The Amazon Resource Number (ARN) of the feature group that contains the feature.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

feature_group_name#

The name of the feature group that you’ve specified.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable

feature_name#

The name of the feature that you’ve specified.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable

feature_type#

The data type of the feature.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

creation_time#

A timestamp indicating when the feature was created.

Type:

datetime.datetime | None

last_modified_time#

A timestamp indicating when the metadata for the feature group was modified. For example, if you add a parameter describing the feature, the timestamp changes to reflect the last time you

Type:

datetime.datetime | None

feature_identifier#
Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

description#

The description you added to describe the feature.

Type:

str | sagemaker.core.helper.pipeline_variable.PipelineVariable | None

parameters#

The key-value pairs that you added to describe the feature.

Type:

List[sagemaker.core.shapes.shapes.FeatureParameter] | None

creation_time: datetime | None#
description: str | PipelineVariable | None#
feature_group_arn: str | PipelineVariable | None#
feature_group_name: str | PipelineVariable#
feature_identifier: str | PipelineVariable | None#
feature_name: str | PipelineVariable#
feature_type: str | PipelineVariable | None#
classmethod get(feature_group_name: str | PipelineVariable, feature_name: str | PipelineVariable, session: Session | None = None, region: str | PipelineVariable | None = None) FeatureMetadata | None[source]#

Get a FeatureMetadata resource

Parameters:
  • feature_group_name – The name or Amazon Resource Name (ARN) of the feature group containing the feature.

  • feature_name – The name of the feature.

  • session – Boto3 session.

  • region – Region name.

Returns:

The FeatureMetadata resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

get_name() str[source]#
last_modified_time: datetime | None#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parameters: List[FeatureParameter] | None#
refresh() FeatureMetadata | None[source]#

Refresh a FeatureMetadata resource

Returns:

The FeatureMetadata resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

update(description: str | PipelineVariable | None = Unassigned(), parameter_additions: List[FeatureParameter] | None = Unassigned(), parameter_removals: List[str | PipelineVariable] | None = Unassigned()) FeatureMetadata | None[source]#

Update a FeatureMetadata resource

Parameters:
  • parameter_additions – A list of key-value pairs that you can add to better describe the feature.

  • parameter_removals – A list of parameter keys that you can specify to remove parameters that describe your feature.

Returns:

The FeatureMetadata resource.

Raises:
  • botocore.exceptions.ClientError – This exception is raised for AWS service related errors. The error message and error code can be parsed from the exception as follows: `     try:         # AWS service call here     except botocore.exceptions.ClientError as e:         error_message = e.response['Error']['Message']         error_code = e.response['Error']['Code']     `

  • ResourceNotFound – Resource being access is not found.

class sagemaker.mlops.feature_store.FeatureParameter(*, key: str | PipelineVariable | None = Unassigned(), value: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

A key-value pair that you specify to describe the feature.

key#
Type:

A key that must contain a value to describe the feature.

value#
Type:

The value that belongs to a key.

key: str | PipelineVariable | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

value: str | PipelineVariable | None#
class sagemaker.mlops.feature_store.FeatureTypeEnum(value)[source]#

Bases: Enum

Feature data types: Fractional, Integral, or String.

FRACTIONAL = 'Fractional'#
INTEGRAL = 'Integral'#
STRING = 'String'#
class sagemaker.mlops.feature_store.FeatureValue(*, feature_name: str | PipelineVariable, value_as_string: str | PipelineVariable | None = Unassigned(), value_as_string_list: List[str | PipelineVariable] | None = Unassigned())[source]#

Bases: Base

The value associated with a feature.

feature_name#
Type:

The name of a feature that a feature value corresponds to.

value_as_string#
Type:

The value in string format associated with a feature. Used when your CollectionType is None. Note that features types can be String, Integral, or Fractional. This value represents all three types as a string.

value_as_string_list#
Type:

The list of values in string format associated with a feature. Used when your CollectionType is a List, Set, or Vector. Note that features types can be String, Integral, or Fractional. These values represents all three types as a string.

feature_name: str | PipelineVariable#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

value_as_string: str | PipelineVariable | None#
value_as_string_list: List[str | PipelineVariable] | None#
class sagemaker.mlops.feature_store.Filter(*, name: str | PipelineVariable, operator: str | PipelineVariable | None = Unassigned(), value: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

A conditional statement for a search expression that includes a resource property, a Boolean operator, and a value. Resources that match the statement are returned in the results from the Search API. If you specify a Value, but not an Operator, SageMaker uses the equals operator. In search, there are several property types: Metrics To define a metric filter, enter a value using the form “Metrics.&lt;name&gt;”, where &lt;name&gt; is a metric name. For example, the following filter searches for training jobs with an “accuracy” metric greater than “0.9”: { “Name”: “Metrics.accuracy”, “Operator”: “GreaterThan”, “Value”: “0.9” } HyperParameters To define a hyperparameter filter, enter a value with the form “HyperParameters.&lt;name&gt;”. Decimal hyperparameter values are treated as a decimal in a comparison if the specified Value is also a decimal value. If the specified Value is an integer, the decimal hyperparameter values are treated as integers. For example, the following filter is satisfied by training jobs with a “learning_rate” hyperparameter that is less than “0.5”: { “Name”: “HyperParameters.learning_rate”, “Operator”: “LessThan”, “Value”: “0.5” } Tags To define a tag filter, enter a value with the form Tags.&lt;key&gt;.

name#
Type:

A resource property name. For example, TrainingJobName. For valid property names, see SearchRecord. You must specify a valid property for the resource.

operator#
Type:

A Boolean binary operator that is used to evaluate the filter. The operator field contains one of the following values: Equals The value of Name equals Value. NotEquals The value of Name doesn’t equal Value. Exists The Name property exists. NotExists The Name property does not exist. GreaterThan The value of Name is greater than Value. Not supported for text properties. GreaterThanOrEqualTo The value of Name is greater than or equal to Value. Not supported for text properties. LessThan The value of Name is less than Value. Not supported for text properties. LessThanOrEqualTo The value of Name is less than or equal to Value. Not supported for text properties. In The value of Name is one of the comma delimited strings in Value. Only supported for text properties. Contains The value of Name contains the string Value. Only supported for text properties. A SearchExpression can include the Contains operator multiple times when the value of Name is one of the following: Experiment.DisplayName Experiment.ExperimentName Experiment.Tags Trial.DisplayName Trial.TrialName Trial.Tags TrialComponent.DisplayName TrialComponent.TrialComponentName TrialComponent.Tags TrialComponent.InputArtifacts TrialComponent.OutputArtifacts A SearchExpression can include only one Contains operator for all other values of Name. In these cases, if you include multiple Contains operators in the SearchExpression, the result is the following error message: “‘CONTAINS’ operator usage limit of 1 exceeded.”

value#
Type:

A value used with Name and Operator to determine which resources satisfy the filter’s condition. For numerical properties, Value must be an integer or floating-point decimal. For timestamp properties, Value must be an ISO 8601 date-time string of the following format: YYYY-mm-dd’T’HH:MM:SS.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | PipelineVariable#
operator: str | PipelineVariable | None#
value: str | PipelineVariable | None#
class sagemaker.mlops.feature_store.FilterOperatorEnum(value)[source]#

Bases: Enum

Filter operators.

CONTAINS = 'Contains'#
EQUALS = 'Equals'#
EXISTS = 'Exists'#
GREATER_THAN = 'GreaterThan'#
GREATER_THAN_OR_EQUAL_TO = 'GreaterThanOrEqualTo'#
IN = 'In'#
LESS_THAN = 'LessThan'#
LESS_THAN_OR_EQUAL_TO = 'LessThanOrEqualTo'#
NOT_EQUALS = 'NotEquals'#
NOT_EXISTS = 'NotExists'#
sagemaker.mlops.feature_store.FractionalFeatureDefinition(feature_name: str, collection_type: ListCollectionType | SetCollectionType | VectorCollectionType | None = None) FeatureDefinition[source]#

Create a feature definition with Fractional type.

exception sagemaker.mlops.feature_store.IngestionError(failed_rows: List[int], message: str)[source]#

Bases: Exception

Exception raised for errors during ingestion.

failed_rows#

List of row indices that failed to ingest.

message#

Error message.

class sagemaker.mlops.feature_store.IngestionManagerPandas(feature_group_name: str, feature_definitions: Dict[str, Dict[Any, Any]], max_workers: int = 1, max_processes: int = 1)[source]#

Bases: object

Class to manage the multi-threaded data ingestion process.

This class will manage the data ingestion process which is multi-threaded.

feature_group_name#

name of the Feature Group.

Type:

str

feature_definitions#

dictionary of feature definitions where the key is the feature name and the value is the FeatureDefinition. The FeatureDefinition contains the data type of the feature.

Type:

Dict[str, Dict[Any, Any]]

max_workers#

number of threads to create.

Type:

int

max_processes#

number of processes to create. Each process spawns max_workers threads.

Type:

int

property failed_rows: List[int]#

Get rows that failed to ingest.

Returns:

List of row indices that failed to be ingested.

feature_definitions: Dict[str, Dict[Any, Any]]#
feature_group_name: str#
max_processes: int = 1#
max_workers: int = 1#
run(data_frame: DataFrame, target_stores: List[str] = None, wait: bool = True, timeout: int | float = None)[source]#

Start the ingestion process.

Parameters:
  • data_frame (DataFrame) – source DataFrame to be ingested.

  • target_stores (List[str]) – list of target stores (“OnlineStore”, “OfflineStore”). If None, the default target store is used.

  • wait (bool) – whether to wait for the ingestion to finish or not.

  • timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

Raises:

ValueError – If wait=False with max_workers=1 and max_processes=1.

wait(timeout: int | float | None = None)[source]#

Wait for the ingestion process to finish.

Parameters:

timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

sagemaker.mlops.feature_store.IntegralFeatureDefinition(feature_name: str, collection_type: ListCollectionType | SetCollectionType | VectorCollectionType | None = None) FeatureDefinition[source]#

Create a feature definition with Integral type.

class sagemaker.mlops.feature_store.JoinComparatorEnum(value)[source]#

Bases: Enum

An enumeration.

EQUALS = '='#
GREATER_THAN = '>'#
GREATER_THAN_OR_EQUAL_TO = '>='#
LESS_THAN = '<'#
LESS_THAN_OR_EQUAL_TO = '<='#
NOT_EQUAL_TO = '<>'#
class sagemaker.mlops.feature_store.JoinTypeEnum(value)[source]#

Bases: Enum

An enumeration.

CROSS_JOIN = 'CROSS JOIN'#
FULL_JOIN = 'FULL JOIN'#
INNER_JOIN = 'JOIN'#
LEFT_JOIN = 'LEFT JOIN'#
RIGHT_JOIN = 'RIGHT JOIN'#
class sagemaker.mlops.feature_store.ListCollectionType[source]#

Bases: object

List collection type.

collection_config = None#
collection_type = 'List'#
class sagemaker.mlops.feature_store.OfflineStoreConfig(*, s3_storage_config: S3StorageConfig, disable_glue_table_creation: bool | None = Unassigned(), data_catalog_config: DataCatalogConfig | None = Unassigned(), table_format: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

The configuration of an OfflineStore. Provide an OfflineStoreConfig in a request to CreateFeatureGroup to create an OfflineStore. To encrypt an OfflineStore using at rest data encryption, specify Amazon Web Services Key Management Service (KMS) key ID, or KMSKeyId, in S3StorageConfig.

s3_storage_config#
Type:

The Amazon Simple Storage (Amazon S3) location of OfflineStore.

disable_glue_table_creation#
Type:

Set to True to disable the automatic creation of an Amazon Web Services Glue table when configuring an OfflineStore. If set to False, Feature Store will name the OfflineStore Glue table following Athena’s naming recommendations. The default value is False.

data_catalog_config#
Type:

The meta data of the Glue table that is autogenerated when an OfflineStore is created.

table_format#
Type:

Format for the offline store table. Supported formats are Glue (Default) and Apache Iceberg.

data_catalog_config: DataCatalogConfig | None#
disable_glue_table_creation: bool | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

s3_storage_config: S3StorageConfig#
table_format: str | PipelineVariable | None#
class sagemaker.mlops.feature_store.OnlineStoreConfig(*, security_config: OnlineStoreSecurityConfig | None = Unassigned(), enable_online_store: bool | None = Unassigned(), ttl_duration: TtlDuration | None = Unassigned(), storage_type: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

Use this to specify the Amazon Web Services Key Management Service (KMS) Key ID, or KMSKeyId, for at rest data encryption. You can turn OnlineStore on or off by specifying the EnableOnlineStore flag at General Assembly. The default value is False.

security_config#
Type:

Use to specify KMS Key ID (KMSKeyId) for at-rest encryption of your OnlineStore.

enable_online_store#
Type:

Turn OnlineStore off by specifying False for the EnableOnlineStore flag. Turn OnlineStore on by specifying True for the EnableOnlineStore flag. The default value is False.

ttl_duration#
Type:

Time to live duration, where the record is hard deleted after the expiration time is reached; ExpiresAt = EventTime + TtlDuration. For information on HardDelete, see the DeleteRecord API in the Amazon SageMaker API Reference guide.

storage_type#
Type:

Option for different tiers of low latency storage for real-time data retrieval. Standard: A managed low latency data store for feature groups. InMemory: A managed data store for feature groups that supports very low latency retrieval.

enable_online_store: bool | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

security_config: OnlineStoreSecurityConfig | None#
storage_type: str | PipelineVariable | None#
ttl_duration: TtlDuration | None#
class sagemaker.mlops.feature_store.OnlineStoreSecurityConfig(*, kms_key_id: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

The security configuration for OnlineStore.

kms_key_id#
Type:

The Amazon Web Services Key Management Service (KMS) key ARN that SageMaker Feature Store uses to encrypt the Amazon S3 objects at rest using Amazon S3 server-side encryption. The caller (either user or IAM role) of CreateFeatureGroup must have below permissions to the OnlineStore KmsKeyId: “kms:Encrypt” “kms:Decrypt” “kms:DescribeKey” “kms:CreateGrant” “kms:RetireGrant” “kms:ReEncryptFrom” “kms:ReEncryptTo” “kms:GenerateDataKey” “kms:ListAliases” “kms:ListGrants” “kms:RevokeGrant” The caller (either user or IAM role) to all DataPlane operations (PutRecord, GetRecord, DeleteRecord) must have the following permissions to the KmsKeyId: “kms:Decrypt”

kms_key_id: str | PipelineVariable | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class sagemaker.mlops.feature_store.OnlineStoreStorageTypeEnum(value)[source]#

Bases: Enum

Storage types for online store.

IN_MEMORY = 'InMemory'#
STANDARD = 'Standard'#
class sagemaker.mlops.feature_store.ResourceEnum(value)[source]#

Bases: Enum

Resource types for search.

FEATURE_GROUP = 'FeatureGroup'#
FEATURE_METADATA = 'FeatureMetadata'#
class sagemaker.mlops.feature_store.S3StorageConfig(*, s3_uri: str | PipelineVariable, kms_key_id: str | PipelineVariable | None = Unassigned(), resolved_output_s3_uri: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

The Amazon Simple Storage (Amazon S3) location and security configuration for OfflineStore.

s3_uri#
Type:

The S3 URI, or location in Amazon S3, of OfflineStore. S3 URIs have a format similar to the following: s3://example-bucket/prefix/.

kms_key_id#
Type:

The Amazon Web Services Key Management Service (KMS) key ARN of the key used to encrypt any objects written into the OfflineStore S3 location. The IAM roleARN that is passed as a parameter to CreateFeatureGroup must have below permissions to the KmsKeyId: “kms:GenerateDataKey”

resolved_output_s3_uri#
Type:

The S3 path where offline records are written.

kms_key_id: str | PipelineVariable | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

resolved_output_s3_uri: str | PipelineVariable | None#
s3_uri: str | PipelineVariable#
class sagemaker.mlops.feature_store.SearchExpression(*, filters: List[Filter] | None = Unassigned(), nested_filters: List[NestedFilters] | None = Unassigned(), sub_expressions: List[SearchExpression] | None = Unassigned(), operator: str | PipelineVariable | None = Unassigned())[source]#

Bases: Base

A multi-expression that searches for the specified resource or resources in a search. All resource objects that satisfy the expression’s condition are included in the search results. You must specify at least one subexpression, filter, or nested filter. A SearchExpression can contain up to twenty elements. A SearchExpression contains the following components: A list of Filter objects. Each filter defines a simple Boolean expression comprised of a resource property name, Boolean operator, and value. A list of NestedFilter objects. Each nested filter defines a list of Boolean expressions using a list of resource properties. A nested filter is satisfied if a single object in the list satisfies all Boolean expressions. A list of SearchExpression objects. A search expression object can be nested in a list of search expression objects. A Boolean operator: And or Or.

filters#
Type:

A list of filter objects.

nested_filters#
Type:

A list of nested filter objects.

sub_expressions#
Type:

A list of search expression objects.

operator#
Type:

A Boolean operator used to evaluate the search expression. If you want every conditional statement in all lists to be satisfied for the entire search expression to be true, specify And. If only a single conditional statement needs to be true for the entire search expression to be true, specify Or. The default value is And.

filters: List[Filter] | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nested_filters: List[NestedFilters] | None#
operator: str | PipelineVariable | None#
sub_expressions: List[SearchExpression] | None#
class sagemaker.mlops.feature_store.SearchOperatorEnum(value)[source]#

Bases: Enum

Search operators.

AND = 'And'#
OR = 'Or'#
class sagemaker.mlops.feature_store.SetCollectionType[source]#

Bases: object

Set collection type.

collection_config = None#
collection_type = 'Set'#
class sagemaker.mlops.feature_store.SortOrderEnum(value)[source]#

Bases: Enum

Sort orders.

ASCENDING = 'Ascending'#
DESCENDING = 'Descending'#
sagemaker.mlops.feature_store.StringFeatureDefinition(feature_name: str, collection_type: ListCollectionType | SetCollectionType | VectorCollectionType | None = None) FeatureDefinition[source]#

Create a feature definition with String type.

class sagemaker.mlops.feature_store.TableFormatEnum(value)[source]#

Bases: Enum

Offline store table formats.

GLUE = 'Glue'#
ICEBERG = 'Iceberg'#
class sagemaker.mlops.feature_store.TableType(value)[source]#

Bases: Enum

An enumeration.

DATA_FRAME = 'DataFrame'#
FEATURE_GROUP = 'FeatureGroup'#
class sagemaker.mlops.feature_store.TargetStoreEnum(value)[source]#

Bases: Enum

Store types for put_record.

OFFLINE_STORE = 'OfflineStore'#
ONLINE_STORE = 'OnlineStore'#
class sagemaker.mlops.feature_store.ThroughputConfig(*, throughput_mode: str | PipelineVariable, provisioned_read_capacity_units: int | None = Unassigned(), provisioned_write_capacity_units: int | None = Unassigned())[source]#

Bases: Base

Used to set feature group throughput configuration. There are two modes: ON_DEMAND and PROVISIONED. With on-demand mode, you are charged for data reads and writes that your application performs on your feature group. You do not need to specify read and write throughput because Feature Store accommodates your workloads as they ramp up and down. You can switch a feature group to on-demand only once in a 24 hour period. With provisioned throughput mode, you specify the read and write capacity per second that you expect your application to require, and you are billed based on those limits. Exceeding provisioned throughput will result in your requests being throttled. Note: PROVISIONED throughput mode is supported only for feature groups that are offline-only, or use the Standard tier online store.

throughput_mode#
Type:

The mode used for your feature group throughput: ON_DEMAND or PROVISIONED.

provisioned_read_capacity_units#
Type:

For provisioned feature groups with online store enabled, this indicates the read throughput you are billed for and can consume without throttling. This field is not applicable for on-demand feature groups.

provisioned_write_capacity_units#
Type:

For provisioned feature groups, this indicates the write throughput you are billed for and can consume without throttling. This field is not applicable for on-demand feature groups.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

provisioned_read_capacity_units: int | None#
provisioned_write_capacity_units: int | None#
throughput_mode: str | PipelineVariable#
class sagemaker.mlops.feature_store.ThroughputModeEnum(value)[source]#

Bases: Enum

Throughput modes for feature group.

ON_DEMAND = 'OnDemand'#
PROVISIONED = 'Provisioned'#
class sagemaker.mlops.feature_store.TtlDuration(*, unit: str | PipelineVariable | None = Unassigned(), value: int | None = Unassigned())[source]#

Bases: Base

Time to live duration, where the record is hard deleted after the expiration time is reached; ExpiresAt = EventTime + TtlDuration. For information on HardDelete, see the DeleteRecord API in the Amazon SageMaker API Reference guide.

unit#
Type:

TtlDuration time unit.

value#
Type:

TtlDuration time value.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

unit: str | PipelineVariable | None#
value: int | None#
class sagemaker.mlops.feature_store.VectorCollectionType(dimension: int)[source]#

Bases: object

Vector collection type with dimension.

collection_type = 'Vector'#
sagemaker.mlops.feature_store.as_hive_ddl(feature_group_name: str, database: str = 'sagemaker_featurestore', table_name: str | None = None) str[source]#

Generate Hive DDL for a FeatureGroup’s offline store table.

Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type.

Parameters:
  • feature_group_name – Name of the FeatureGroup.

  • database – Hive database name (default: “sagemaker_featurestore”).

  • table_name – Hive table name (default: feature_group_name).

Returns:

CREATE EXTERNAL TABLE DDL string.

sagemaker.mlops.feature_store.create_athena_query(feature_group_name: str, session: Session)[source]#

Create an AthenaQuery for a FeatureGroup.

Parameters:
  • feature_group_name – Name of the FeatureGroup.

  • session – Session instance for Athena boto calls.

Returns:

AthenaQuery initialized with data catalog config.

Raises:

RuntimeError – If no metastore is configured.

sagemaker.mlops.feature_store.get_session_from_role(region: str, assume_role: str | None = None) Session[source]#

Get a Session from a region and optional IAM role.

Parameters:
  • region – AWS region name.

  • assume_role – IAM role ARN to assume (default: None).

Returns:

Session instance.

sagemaker.mlops.feature_store.ingest_dataframe(feature_group_name: str, data_frame: DataFrame, max_workers: int = 1, max_processes: int = 1, wait: bool = True, timeout: int | float = None)[source]#

Ingest a pandas DataFrame to a FeatureGroup.

Parameters:
  • feature_group_name – Name of the FeatureGroup.

  • data_frame – DataFrame to ingest.

  • max_workers – Threads per process (default: 1).

  • max_processes – Number of processes (default: 1).

  • wait – Wait for ingestion to complete (default: True).

  • timeout – Timeout in seconds (default: None).

Returns:

IngestionManagerPandas instance.

Raises:

ValueError – If max_workers or max_processes <= 0.

sagemaker.mlops.feature_store.load_feature_definitions_from_dataframe(data_frame: DataFrame, online_storage_type: str | None = None) Sequence[FeatureDefinition][source]#

Infer FeatureDefinitions from DataFrame dtypes.

Column name is used as feature name. Feature type is inferred from the dtype of the column. Integer dtypes are mapped to Integral feature type. Float dtypes are mapped to Fractional feature type. All other dtypes are mapped to String.

For IN_MEMORY online_storage_type, collection type columns within DataFrame will be inferred as List instead of String.

Parameters:
  • data_frame – DataFrame to infer features from.

  • online_storage_type – “Standard” or “InMemory” (default: None).

Returns:

List of FeatureDefinition objects.