sagemaker.core.utils.logs#
Classes
|
A singleton class for creating a CloudWatchLogs client. |
|
|
|
- class sagemaker.core.utils.logs.CloudWatchLogsClient(*args, **kwargs)[source]#
Bases:
objectA singleton class for creating a CloudWatchLogs client.
- client: <module 'botocore.client' from '/home/docs/checkouts/readthedocs.org/user_builds/sagemaker/envs/5737/lib/python3.10/site-packages/botocore/client.py'> = None#
- class sagemaker.core.utils.logs.LogStreamHandler(log_group_name: str, log_stream_name: str, stream_id: int)[source]#
Bases:
object- cw_client = None#
- get_latest_log_events() Generator[Tuple[str, dict], None, None][source]#
This method gets all the latest log events for this stream that exist at this moment in time.
cw_client.get_log_events() always returns a nextForwardToken even if the current batch of events is empty. You can keep calling cw_client.get_log_events() with the same token until a new batch of log events exist.
API Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/get_log_events.html
- Returns:
- Generator that yields a tuple that consists for two values
str: stream_name, dict: event dict in format
- {
“ingestionTime”: number, “message”: “string”, “timestamp”: number
}
- Return type:
Generator[tuple[str, dict], None, None]
- log_group_name: str = None#
- log_stream_name: str = None#
- next_token: str = None#
- stream_id: int = None#
- class sagemaker.core.utils.logs.MultiLogStreamHandler(log_group_name: str, log_stream_name_prefix: str, expected_stream_count: int)[source]#
Bases:
object- cw_client = None#
- expected_stream_count: int = None#
- get_latest_log_events() Generator[Tuple[str, dict], None, None][source]#
This method gets all the latest log events from each stream that exist at this moment.
- Returns:
- Generator that yields a tuple that consists for two values
str: stream_name, dict: event dict in format -
- {
“ingestionTime”: number, “message”: “string”, “timestamp”: number
}
- Return type:
Generator[tuple[str, dict], None, None]
- log_group_name: str = None#
- log_stream_name_prefix: str = None#
- ready() bool[source]#
Checks whether or not MultiLogStreamHandler is ready to serve new log events at this moment.
If self.streams is already set, return True. Otherwise, check if the current number of log streams in the log group match the exptected stream count.
- Returns:
Whether or not MultiLogStreamHandler is ready to serve new log events.
- Return type:
bool
- streams: List[LogStreamHandler] = []#