sagemaker.mlops.feature_store.ingestion_manager_pandas#

Multi-threaded data ingestion for FeatureStore using SageMaker Core.

Classes

IngestionManagerPandas(feature_group_name, ...)

Class to manage the multi-threaded data ingestion process.

Exceptions

IngestionError(failed_rows, message)

Exception raised for errors during ingestion.

exception sagemaker.mlops.feature_store.ingestion_manager_pandas.IngestionError(failed_rows: List[int], message: str)[source]#

Bases: Exception

Exception raised for errors during ingestion.

failed_rows#

List of row indices that failed to ingest.

message#

Error message.

class sagemaker.mlops.feature_store.ingestion_manager_pandas.IngestionManagerPandas(feature_group_name: str, feature_definitions: Dict[str, Dict[Any, Any]], max_workers: int = 1, max_processes: int = 1)[source]#

Bases: object

Class to manage the multi-threaded data ingestion process.

This class will manage the data ingestion process which is multi-threaded.

feature_group_name#

name of the Feature Group.

Type:

str

feature_definitions#

dictionary of feature definitions where the key is the feature name and the value is the FeatureDefinition. The FeatureDefinition contains the data type of the feature.

Type:

Dict[str, Dict[Any, Any]]

max_workers#

number of threads to create.

Type:

int

max_processes#

number of processes to create. Each process spawns max_workers threads.

Type:

int

property failed_rows: List[int]#

Get rows that failed to ingest.

Returns:

List of row indices that failed to be ingested.

feature_definitions: Dict[str, Dict[Any, Any]]#
feature_group_name: str#
max_processes: int = 1#
max_workers: int = 1#
run(data_frame: DataFrame, target_stores: List[str] = None, wait: bool = True, timeout: int | float = None)[source]#

Start the ingestion process.

Parameters:
  • data_frame (DataFrame) – source DataFrame to be ingested.

  • target_stores (List[str]) – list of target stores (“OnlineStore”, “OfflineStore”). If None, the default target store is used.

  • wait (bool) – whether to wait for the ingestion to finish or not.

  • timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

Raises:

ValueError – If wait=False with max_workers=1 and max_processes=1.

wait(timeout: int | float | None = None)[source]#

Wait for the ingestion process to finish.

Parameters:

timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.