"""Custom Scorer Evaluator for SageMaker Model Evaluation Module.
This module provides evaluation capabilities using custom scorer metrics,
supporting both built-in preset metrics and custom evaluator implementations
for flexible model evaluation workflows.
"""
import logging
from enum import Enum
from typing import Any, Optional, Type, Union
from pydantic import validator
from .base_evaluator import BaseEvaluator
from .constants import EvalType
from .execution import EvaluationPipelineExecution
from sagemaker.core.telemetry.telemetry_logging import _telemetry_emitter
from sagemaker.core.telemetry.constants import Feature
_logger = logging.getLogger(__name__)
class _BuiltInMetric(str, Enum):
"""Internal: Preset metrics for custom scorer evaluation.
These metrics provide built-in evaluation capabilities for common use cases.
Note:
This is an internal class. Users should use ``get_builtin_metrics()`` instead.
"""
PRIME_MATH = "prime_math"
PRIME_CODE = "prime_code"
[docs]
def get_builtin_metrics() -> Type[_BuiltInMetric]:
"""Get the built-in metrics enum for custom scorer evaluation.
This utility function provides access to preset metrics for custom scorer evaluation.
Returns:
Type[_BuiltInMetric]: The built-in metric enum class
Example:
.. code:: python
from sagemaker.train.evaluate import get_builtin_metrics
BuiltInMetric = get_builtin_metrics()
evaluator = CustomScorerEvaluator(
evaluator=BuiltInMetric.PRIME_MATH,
dataset=my_dataset,
base_model="my-model",
s3_output_path="s3://bucket/output",
mlflow_resource_arn="arn:..."
)
"""
return _BuiltInMetric
[docs]
class CustomScorerEvaluator(BaseEvaluator):
"""Custom scorer evaluation job for preset or custom evaluator metrics.
This evaluator supports both preset metrics (via built-in metrics enum) and
custom evaluator implementations for specialized evaluation needs.
Attributes:
evaluator (Union[str, Any]): Built-in metric enum value, Evaluator object, or Evaluator
ARN string. Required. Use ``get_builtin_metrics()`` for available preset metrics.
dataset (Any): Dataset for evaluation. Required. Accepts S3 URI, Dataset ARN, or DataSet object.
mlflow_resource_arn (Optional[str]): ARN of the MLflow tracking server for experiment tracking.
Optional. If not provided, the system will attempt to resolve it using the default
MLflow app experience (checks domain match, account default, or creates a new app).
Inherited from BaseEvaluator.
evaluate_base_model (bool): Whether to evaluate the base model in addition to the custom
model. Set to False to skip base model evaluation and only evaluate the custom model.
Defaults to True (evaluates both models).
region (Optional[str]): AWS region. Inherited from BaseEvaluator.
sagemaker_session (Optional[Any]): SageMaker session object. Inherited from BaseEvaluator.
model (Union[str, Any]): Model for evaluation. Inherited from BaseEvaluator.
base_eval_name (Optional[str]): Base name for evaluation jobs. Inherited from BaseEvaluator.
s3_output_path (str): S3 location for evaluation outputs. Inherited from BaseEvaluator.
mlflow_experiment_name (Optional[str]): MLflow experiment name. Inherited from BaseEvaluator.
mlflow_run_name (Optional[str]): MLflow run name. Inherited from BaseEvaluator.
networking (Optional[VpcConfig]): VPC configuration. Inherited from BaseEvaluator.
kms_key_id (Optional[str]): KMS key ID for encryption. Inherited from BaseEvaluator.
model_package_group (Optional[Union[str, ModelPackageGroup]]): Model package group.
Inherited from BaseEvaluator.
Example:
.. code:: python
from sagemaker.train.evaluate.custom_scorer_evaluator import (
CustomScorerEvaluator,
get_builtin_metrics
)
from sagemaker.ai_registry.evaluator import Evaluator
# Using preset metric
BuiltInMetric = get_builtin_metrics()
evaluator = CustomScorerEvaluator(
evaluator=BuiltInMetric.PRIME_MATH,
dataset=my_dataset,
base_model="my-model",
s3_output_path="s3://bucket/output",
mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)
# Using custom evaluator
my_evaluator = Evaluator.create(
name="my-custom-evaluator",
function_source="/path/to/evaluator.py",
sub_type="AWS/Evaluator"
)
evaluator = CustomScorerEvaluator(
evaluator=my_evaluator,
dataset=my_dataset,
base_model="my-model",
s3_output_path="s3://bucket/output",
mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)
# Using evaluator ARN string
evaluator = CustomScorerEvaluator(
evaluator="arn:aws:sagemaker:us-west-2:123456789012:hub-content/AIRegistry/Evaluator/my-evaluator/1",
dataset=my_dataset,
base_model="my-model",
s3_output_path="s3://bucket/output",
mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)
job = evaluator.evaluate()
"""
evaluator: Union[str, Any]
dataset: Any
_hyperparameters: Optional[Any] = None
# Template-required fields
evaluate_base_model: bool = False
@validator('dataset', pre=True)
def _resolve_dataset(cls, v):
"""Resolve dataset to string (S3 URI or ARN) and validate format.
Uses BaseEvaluator's common validation logic to avoid code duplication.
"""
return BaseEvaluator._validate_and_resolve_dataset(v)
@validator('evaluator')
def _validate_evaluator(cls, v):
"""Validate evaluator parameter is a built-in metric, Evaluator object, or ARN string"""
# Check if it's a built-in metric enum
if isinstance(v, _BuiltInMetric):
return v
# Check if it's an Evaluator object (has 'arn' attribute)
if hasattr(v, 'arn'):
_logger.info(f"Resolving Evaluator object to ARN: {v.arn}")
return v.arn
# Check if it's a string (should be an ARN)
if isinstance(v, str):
# Validate it looks like an ARN or is a valid built-in metric name
if v.startswith('arn:'):
return v
# Try to match as built-in metric name
try:
return _BuiltInMetric(v)
except ValueError:
raise ValueError(
f"Invalid evaluator: '{v}'. Must be a built-in metric enum value, "
f"Evaluator object, or valid Evaluator ARN. "
f"Available built-in metrics: {', '.join(m.value for m in _BuiltInMetric)}"
)
raise ValueError(
f"Invalid evaluator type: {type(v).__name__}. "
f"Must be a built-in metric enum value, Evaluator object, or ARN string."
)
@property
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="CustomScorerEvaluator.hyperparameters")
def hyperparameters(self):
"""Get evaluation hyperparameters as a FineTuningOptions object.
This property provides access to evaluation hyperparameters with validation,
type checking, and user-friendly information display. Hyperparameters are
lazily loaded from the JumpStart Hub when first accessed.
Returns:
FineTuningOptions: Dynamic object with evaluation hyperparameters
Raises:
ValueError: If base model name is not available or if hyperparameters cannot be loaded
Example:
.. code:: python
evaluator = CustomScorerEvaluator(...)
# Access current values
print(evaluator.hyperparameters.temperature)
# Modify values (with validation)
evaluator.hyperparameters.temperature = 0.5
# Get as dictionary
params = evaluator.hyperparameters.to_dict()
# Display parameter information
evaluator.hyperparameters.get_info()
evaluator.hyperparameters.get_info('temperature')
"""
if self._hyperparameters is None:
from ..common import FineTuningOptions
from ..common_utils.recipe_utils import _get_evaluation_override_params, _extract_eval_override_options
# Get the hub content name from the base model
hub_content_name = self._base_model_name
if not hub_content_name:
raise ValueError(
"Base model name not available. Cannot load hyperparameters. "
"Ensure base_model is properly configured. "
"The base_model parameter must be set to a valid model identifier (e.g., JumpStart model ID, "
"model package ARN, or model ARN) to enable hyperparameter configuration."
)
# Get region
region = self.region
# Fetch override parameters from hub (let exceptions propagate)
_logger.info(f"Fetching evaluation override parameters for hyperparameters property")
# Extract boto_session from sagemaker_core Session
# HubContent.get() in recipe_utils expects boto3 session, not sagemaker_core Session
boto_session = (self.sagemaker_session.boto_session
if hasattr(self.sagemaker_session, 'boto_session')
else self.sagemaker_session)
override_params = _get_evaluation_override_params(
hub_content_name=hub_content_name,
hub_name="SageMakerPublicHub",
evaluation_type="DeterministicEvaluation",
region=region,
session=boto_session
)
# Extract full parameter specifications
configurable_params = _extract_eval_override_options(override_params, return_full_spec=True)
# Create FineTuningOptions object from full specifications
self._hyperparameters = FineTuningOptions(configurable_params)
return self._hyperparameters
def _resolve_evaluator_config(self) -> dict:
"""Resolve evaluator configuration (ARN vs preset).
Returns:
dict: Dictionary with:
- evaluator_arn (Optional[str]): Custom evaluator ARN or None
- preset_reward_function (Optional[str]): Preset function name or None
"""
evaluator_arn = None
preset_reward_function = None
if isinstance(self.evaluator, _BuiltInMetric):
# Built-in metric enum - use as preset_reward_function
preset_reward_function = self.evaluator.value
elif isinstance(self.evaluator, str) and self.evaluator.startswith('arn:'):
# Custom evaluator ARN
evaluator_arn = self.evaluator
elif isinstance(self.evaluator, str):
# Built-in metric as string
preset_reward_function = self.evaluator
return {
'evaluator_arn': evaluator_arn,
'preset_reward_function': preset_reward_function
}
def _get_custom_scorer_template_additions(self, evaluator_config: dict) -> dict:
"""Get custom scorer specific template context additions.
Args:
evaluator_config: Dictionary with evaluator_arn and preset_reward_function
Returns:
dict: Custom scorer specific template context fields
"""
from ..common_utils.recipe_utils import _is_nova_model
# Get configured hyperparameters
configured_params = self.hyperparameters.to_dict()
_logger.info(f"Using configured hyperparameters: {configured_params}")
# Determine if this is a Nova model
is_nova = _is_nova_model(self._base_model_name)
metric_key = 'metric' if is_nova else 'evaluation_metric'
# Build custom scorer specific context
custom_scorer_context = {
'task': 'gen_qa', # Fixed task for custom scorer
'strategy': 'gen_qa', # Fixed strategy for gen_qa task
metric_key: "all", # Use 'metric' for Nova, 'evaluation_metric' for OpenWeights
'evaluate_base_model': self.evaluate_base_model,
'evaluator_arn': evaluator_config['evaluator_arn'],
}
# Add lambda_type for Nova models
if is_nova:
custom_scorer_context['lambda_type'] = 'rft'
# Add preset_reward_function if present
if evaluator_config['preset_reward_function']:
custom_scorer_context['preset_reward_function'] = evaluator_config['preset_reward_function']
# Add all configured hyperparameters
for key in configured_params.keys():
custom_scorer_context[key] = configured_params[key]
# Determine postprocessing and aggregation values
# When evaluator_arn is provided, postprocessing must be enabled for Lambda execution
if evaluator_config['evaluator_arn']:
custom_scorer_context['postprocessing'] = 'True'
if not custom_scorer_context.get('aggregation'):
custom_scorer_context['aggregation'] = 'mean'
return custom_scorer_context
def _get_inference_params_from_hub(self, region: str) -> dict:
"""Fetch inference parameters from JumpStart Hub for the base model
This method retrieves the evaluation recipe override parameters from the hub
and extracts the inference parameters (max_new_tokens, temperature, top_k, top_p).
Args:
region: AWS region
Returns:
Dict containing inference parameters as strings. Returns fallback values if fetch fails.
"""
from ..common_utils.recipe_utils import _get_evaluation_override_params, _extract_eval_override_options
# Default fallback values
fallback_params = {
'max_new_tokens': '8192',
'temperature': '0',
'top_k': '-1',
'top_p': '1.0'
}
try:
# Get the hub content name from the base model
hub_content_name = self._base_model_name
if not hub_content_name:
logger.warning("Base model name not available, using fallback inference parameters")
return fallback_params
# Get boto session for API calls
session = self.sagemaker_session.boto_session if hasattr(self.sagemaker_session, 'boto_session') else None
# Fetch override parameters from hub
_logger.info(f"Fetching evaluation recipe override parameters from hub for model: {hub_content_name}")
override_params = _get_evaluation_override_params(
hub_content_name=hub_content_name,
hub_name="SageMakerPublicHub",
evaluation_type="DeterministicEvaluation",
region=region,
session=session
)
# Extract evaluation override options
inference_params = _extract_eval_override_options(override_params)
_logger.info(f"Successfully fetched inference parameters from hub: {inference_params}")
return inference_params
except Exception as e:
_logger.warning(
f"Failed to fetch inference parameters from hub for model '{self._base_model_name}': {e}. "
f"Using fallback values: {fallback_params}"
)
return fallback_params
[docs]
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="CustomScorerEvaluator.evaluate")
def evaluate(self) -> EvaluationPipelineExecution:
"""Create and start a custom scorer evaluation job.
Returns:
EvaluationPipelineExecution: The created custom scorer evaluation execution
Example:
.. code:: python
evaluator = CustomScorerEvaluator(
evaluator=BuiltInMetric.CODE_EXECUTIONS,
dataset=my_dataset,
base_model="my-model",
s3_output_path="s3://bucket/output",
mlflow_resource_arn="arn:..."
)
execution = evaluator.evaluate()
execution.wait()
"""
from .pipeline_templates import CUSTOM_SCORER_TEMPLATE, CUSTOM_SCORER_TEMPLATE_BASE_MODEL_ONLY
# Get AWS execution context (role ARN, region, account ID)
aws_context = self._get_aws_execution_context()
# Resolve model artifacts
artifacts = self._resolve_model_artifacts(aws_context['region'])
# Get or infer model_package_group ARN (handles all cases internally)
model_package_group_arn = self._get_model_package_group_arn()
# Log resolved model information for debugging
_logger.info(f"Resolved model info - base_model_name: {self._base_model_name}, base_model_arn: {self._base_model_arn}, source_model_package_arn: {self._source_model_package_arn}")
# Resolve evaluator configuration
evaluator_config = self._resolve_evaluator_config()
# Build base template context
template_context = self._get_base_template_context(
role_arn=aws_context['role_arn'],
region=aws_context['region'],
account_id=aws_context['account_id'],
model_package_group_arn=model_package_group_arn,
resolved_model_artifact_arn=artifacts['resolved_model_artifact_arn']
)
# Add dataset URI
template_context['dataset_uri'] = self.dataset
# Add custom scorer specific template additions
custom_scorer_additions = self._get_custom_scorer_template_additions(evaluator_config)
template_context.update(custom_scorer_additions)
# Add VPC and KMS configuration
template_context = self._add_vpc_and_kms_to_context(template_context)
# Select appropriate template
template_str = self._select_template(
CUSTOM_SCORER_TEMPLATE_BASE_MODEL_ONLY,
CUSTOM_SCORER_TEMPLATE
)
# Render pipeline definition
pipeline_definition = self._render_pipeline_definition(template_str, template_context)
# Generate execution name
name = self.base_eval_name or f"custom-scorer-eval"
# Start execution
return self._start_execution(
eval_type=EvalType.CUSTOM_SCORER,
name=name,
pipeline_definition=pipeline_definition,
role_arn=aws_context['role_arn'],
region=aws_context['region']
)
[docs]
@classmethod
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="CustomScorerEvaluator.get_all")
def get_all(cls, session: Optional[Any] = None, region: Optional[str] = None):
"""Get all custom scorer evaluation executions.
Uses ``EvaluationPipelineExecution.get_all()`` to retrieve all custom scorer
evaluation executions as an iterator.
Args:
session (Optional[Any]): Optional boto3 session. If not provided, will be inferred.
region (Optional[str]): Optional AWS region. If not provided, will be inferred.
Yields:
EvaluationPipelineExecution: Custom scorer evaluation execution instances
Example:
.. code:: python
# Get all custom scorer evaluations as iterator
evaluations = CustomScorerEvaluator.get_all()
all_executions = list(evaluations)
# Or iterate directly
for execution in CustomScorerEvaluator.get_all():
print(f"{execution.name}: {execution.status.overall_status}")
# With specific session/region
evaluations = CustomScorerEvaluator.get_all(session=my_session, region='us-west-2')
all_executions = list(evaluations)
"""
# Use EvaluationPipelineExecution.get_all() with CUSTOM_SCORER eval_type
# This returns a generator, so we yield from it
yield from EvaluationPipelineExecution.get_all(
eval_type=EvalType.CUSTOM_SCORER,
session=session,
region=region
)