sagemaker.mlops.feature_store.feature_utils#
Utilities for working with FeatureGroups and FeatureStores.
Functions
|
Generate Hive DDL for a FeatureGroup's offline store table. |
|
Create an AthenaQuery for a FeatureGroup. |
|
Download query result file from S3. |
|
Download CSV from S3 and return as DataFrame. |
|
|
|
Get execution status of an Athena query. |
|
Get a Session from a region and optional IAM role. |
|
Ingest a pandas DataFrame to a FeatureGroup. |
Infer FeatureDefinitions from DataFrame dtypes. |
|
|
Prepares a dataframe to create a |
|
Execute Athena query, wait for completion, and return result. |
|
Start Athena query execution. |
|
Upload DataFrame to S3 as CSV. |
|
Wait for Athena query to finish. |
- sagemaker.mlops.feature_store.feature_utils.as_hive_ddl(feature_group_name: str, database: str = 'sagemaker_featurestore', table_name: str | None = None) str[source]#
Generate Hive DDL for a FeatureGroup’s offline store table.
Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type.
- Parameters:
feature_group_name – Name of the FeatureGroup.
database – Hive database name (default: “sagemaker_featurestore”).
table_name – Hive table name (default: feature_group_name).
- Returns:
CREATE EXTERNAL TABLE DDL string.
- sagemaker.mlops.feature_store.feature_utils.create_athena_query(feature_group_name: str, session: Session)[source]#
Create an AthenaQuery for a FeatureGroup.
- Parameters:
feature_group_name – Name of the FeatureGroup.
session – Session instance for Athena boto calls.
- Returns:
AthenaQuery initialized with data catalog config.
- Raises:
RuntimeError – If no metastore is configured.
- sagemaker.mlops.feature_store.feature_utils.download_athena_query_result(session: Session, bucket: str, prefix: str, query_execution_id: str, filename: str)[source]#
Download query result file from S3.
- Parameters:
session – Session instance for boto calls.
bucket – S3 bucket name.
prefix – S3 key prefix.
query_execution_id – The query execution ID.
filename – Local filename to save to.
- sagemaker.mlops.feature_store.feature_utils.download_csv_from_s3(s3_uri: str, session: Session, kms_key: str | None = None) DataFrame[source]#
Download CSV from S3 and return as DataFrame.
- Parameters:
s3_uri – S3 URI of the CSV file.
session – Session instance for boto calls.
kms_key – KMS key for decryption (default: None).
- Returns:
DataFrame with CSV contents.
- sagemaker.mlops.feature_store.feature_utils.get_feature_group_as_dataframe(feature_group_name: str, athena_bucket: str, query: str = 'SELECT * FROM "sagemaker_featurestore"."#{table}"\n WHERE is_deleted=False ', role: str = None, region: str = None, session=None, event_time_feature_name: str = None, latest_ingestion: bool = True, verbose: bool = True, **kwargs) DataFrame[source]#
sagemaker.feature_store.feature_group.FeatureGroupaspandas.DataFrameExamples
>>> from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe >>> >>> region = "eu-west-1" >>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group", >>> athena_bucket="s3://bucket/athena_queries", >>> region=region, >>> event_time_feature_name="EventTimeId" >>> ) >>> >>> type(fg_data) <class 'pandas.core.frame.DataFrame'>
- Description:
Method to run an athena query over a
sagemaker.feature_store.feature_group.FeatureGroupin a Feature Store to retrieve its data. It needs thesagemaker.session.Sessionlinked to a role or the region and/or role used to work with Feature Stores (it uses the module sagemaker.feature_store.feature_utils.get_session_from_role to get the session).
- Parameters:
region (str) – region of the target Feature Store
feature_group_name (str) – feature store name
query (str) – query to run. By default, it will take the latest ingest with data that wasn’t deleted. If latest_ingestion is False it will take all the data in the feature group that wasn’t deleted. It needs to use the keyword “#{table}” to refer to the FeatureGroup name. e.g.: ‘SELECT * FROM “sagemaker_featurestore”.”#{table}”’ It must not end by ‘;’.
athena_bucket (str) – Amazon S3 bucket for running the query
role (str) – role to be assumed to extract data from feature store. If not specified the default sagemaker execution role will be used.
session (str) –
sagemaker.session.Sessionof SageMaker used to work with the feature store. Optional, with role and region parameters it will infer the session.event_time_feature_name (str) – eventTimeId feature. Mandatory only if the latest ingestion is True.
latest_ingestion (bool) – if True it will get the data only from the latest ingestion. If False it will take whatever is specified in the query, or if not specify it, it will get all the data that wasn’t deleted.
verbose (bool) – if True show messages, if False is silent.
**kwargs (object) – key arguments used for the method pandas.read_csv to be able to have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
- Returns:
dataset with the data retrieved from feature group
- Return type:
pandas.DataFrame
- sagemaker.mlops.feature_store.feature_utils.get_query_execution(session: Session, query_execution_id: str) Dict[str, Any][source]#
Get execution status of an Athena query.
- Parameters:
session – Session instance for boto calls.
query_execution_id – The query execution ID.
- Returns:
Response dict from Athena.
- sagemaker.mlops.feature_store.feature_utils.get_session_from_role(region: str, assume_role: str | None = None) Session[source]#
Get a Session from a region and optional IAM role.
- Parameters:
region – AWS region name.
assume_role – IAM role ARN to assume (default: None).
- Returns:
Session instance.
- sagemaker.mlops.feature_store.feature_utils.ingest_dataframe(feature_group_name: str, data_frame: DataFrame, max_workers: int = 1, max_processes: int = 1, wait: bool = True, timeout: int | float = None)[source]#
Ingest a pandas DataFrame to a FeatureGroup.
- Parameters:
feature_group_name – Name of the FeatureGroup.
data_frame – DataFrame to ingest.
max_workers – Threads per process (default: 1).
max_processes – Number of processes (default: 1).
wait – Wait for ingestion to complete (default: True).
timeout – Timeout in seconds (default: None).
- Returns:
IngestionManagerPandas instance.
- Raises:
ValueError – If max_workers or max_processes <= 0.
- sagemaker.mlops.feature_store.feature_utils.load_feature_definitions_from_dataframe(data_frame: DataFrame, online_storage_type: str | None = None) Sequence[FeatureDefinition][source]#
Infer FeatureDefinitions from DataFrame dtypes.
Column name is used as feature name. Feature type is inferred from the dtype of the column. Integer dtypes are mapped to Integral feature type. Float dtypes are mapped to Fractional feature type. All other dtypes are mapped to String.
For IN_MEMORY online_storage_type, collection type columns within DataFrame will be inferred as List instead of String.
- Parameters:
data_frame – DataFrame to infer features from.
online_storage_type – “Standard” or “InMemory” (default: None).
- Returns:
List of FeatureDefinition objects.
- sagemaker.mlops.feature_store.feature_utils.prepare_fg_from_dataframe_or_file(dataframe_or_path: str | Path | DataFrame, feature_group_name: str, role: str | None = None, region: str | None = None, session=None, record_id: str = 'record_id', event_id: str = 'data_as_of_date', verbose: bool = False, **kwargs) FeatureGroup[source]#
Prepares a dataframe to create a
sagemaker.feature_store.feature_group.FeatureGroup- Description:
Function to prepare a
pandas.DataFrameread from a path to a csv file or pass it directly to create asagemaker.feature_store.feature_group.FeatureGroup. The path to the file needs proper dtypes, feature names and mandatory features (record_id, event_id). It needs thesagemaker.session.Sessionlinked to a role or the region and/or role used to work with Feature Stores (it uses the module sagemaker.feature_store.feature_utils.get_session_from_role to get the session). If record_id or event_id are not specified it will create ones by default with the names ‘record_id’ and ‘data_as_of_date’.
- Parameters:
feature_group_name (str) – feature group name
dataframe_or_path (str, Path, pandas.DataFrame) – pandas.DataFrame or path to the data
verbose (bool) – True for displaying messages, False for silent method.
record_id (str, 'record_id') – (Optional) Feature identifier of the rows. If specified each value of that feature has to be unique. If not specified or record_id=’record_id’, then it will create a new feature from the index of the pandas.DataFrame.
event_id (str) – (Optional) Feature with the time of the creation of data rows. If not specified it will create one with the current time called data_as_of_date
role (str) – role used to get the session.
region (str) – region used to get the session.
session (str) – session of SageMaker used to work with the feature store
**kwargs (object) – key arguments used for the method pandas.read_csv to be able to have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
- Returns:
FG prepared with all the methods and definitions properly defined
- Return type:
sagemaker.feature_store.feature_group.FeatureGroup
- sagemaker.mlops.feature_store.feature_utils.run_athena_query(session: Session, catalog: str, database: str, query_string: str, output_location: str, kms_key: str | None = None) Dict[str, Any][source]#
Execute Athena query, wait for completion, and return result.
- Parameters:
session – Session instance for boto calls.
catalog – Name of the data catalog.
database – Name of the database.
query_string – SQL query string.
output_location – S3 URI for query results.
kms_key – KMS key for encryption (default: None).
- Returns:
Query execution result dict.
- Raises:
RuntimeError – If query fails.
- sagemaker.mlops.feature_store.feature_utils.start_query_execution(session: Session, catalog: str, database: str, query_string: str, output_location: str, kms_key: str | None = None, workgroup: str | None = None) Dict[str, str][source]#
Start Athena query execution.
- Parameters:
session – Session instance for boto calls.
catalog – Name of the data catalog.
database – Name of the database.
query_string – SQL query string.
output_location – S3 URI for query results.
kms_key – KMS key for encryption (default: None).
workgroup – Athena workgroup name (default: None).
- Returns:
Response dict with QueryExecutionId.
- sagemaker.mlops.feature_store.feature_utils.upload_dataframe_to_s3(data_frame: DataFrame, output_path: str, session: Session, kms_key: str | None = None) tuple[str, str][source]#
Upload DataFrame to S3 as CSV.
- Parameters:
data_frame – DataFrame to upload.
output_path – S3 URI base path.
session – Session instance for boto calls.
kms_key – KMS key for encryption (default: None).
- Returns:
Tuple of (s3_folder, temp_table_name).
- sagemaker.mlops.feature_store.feature_utils.wait_for_athena_query(session: Session, query_execution_id: str, poll: int = 5)[source]#
Wait for Athena query to finish.
- Parameters:
session – Session instance for boto calls.
query_execution_id – The query execution ID.
poll – Polling interval in seconds (default: 5).