Source code for sagemaker.core.tools.resources_extractor

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""A class for extracting resource information from a service JSON."""
from typing import Optional

import pandas as pd

from sagemaker.core.utils.utils import get_textual_rich_logger
from sagemaker.core.tools.constants import CLASS_METHODS, OBJECT_METHODS
from sagemaker.core.tools.data_extractor import (
    load_additional_operations_data,
    load_combined_operations_data,
    load_combined_shapes_data,
)
from sagemaker.core.tools.method import Method

log = get_textual_rich_logger(__name__)
"""
This class is used to extract the resources and its actions from the service-2.json file.
"""


[docs] class ResourcesExtractor: """ A class for extracting resource information from a service JSON. Args: service_json (dict): The Botocore service.json containing the shape definitions. Attributes: service_json (dict): The service JSON containing operations and shapes. operations (dict): The operations defined in the service JSON. shapes (dict): The shapes defined in the service JSON. resource_actions (dict): A dictionary mapping resources to their associated actions. actions_under_resource (set): A set of actions that are performed on resources. create_resources (set): A set of resources that can be created. add_resources (set): A set of resources that can be added. start_resources (set): A set of resources that can be started. register_resources (set): A set of resources that can be registered. import_resources (set): A set of resources that can be imported. resources (set): A set of all resources. df (DataFrame): A DataFrame containing resource information. Methods: _filter_actions_for_resources(resources): Filters actions based on the given resources. _extract_resources_plan(): Extracts the resource plan from the service JSON. _get_status_chain_and_states(shape_name, status_chain): Recursively extracts the status chain and states for a given shape. _extract_resource_plan_as_dataframe(): Builds a DataFrame containing resource information. get_resource_plan(): Returns the resource plan DataFrame. """ RESOURCE_TO_ADDITIONAL_METHODS = { "Cluster": ["DescribeClusterNode", "ListClusterNodes"], } def __init__( self, combined_shapes: Optional[dict] = None, combined_operations: Optional[dict] = None, ): """ Initializes a ResourceExtractor object. Args: service_json (dict): The service JSON containing operations and shapes. """ self.operations = combined_operations or load_combined_operations_data() self.shapes = combined_shapes or load_combined_shapes_data() self.additional_operations = load_additional_operations_data() # contains information about additional methods only now. # TODO: replace resource_actions with resource_methods to include all methods self.resource_methods = {} self.resource_actions = {} self.actions_under_resource = set() self._extract_resources_plan() def _filter_additional_operations(self): """ Extracts information from additional operations defined in additional_operations.json Returns: None """ for resource_name, resource_operations in self.additional_operations.items(): self.resources.add(resource_name) if resource_name not in self.resource_methods: self.resource_methods[resource_name] = dict() for operation_name, operation in resource_operations.items(): self.actions_under_resource.add(operation_name) method = Method(**operation) method.get_docstring_title(self.operations[operation_name]) self.resource_methods[resource_name][operation["method_name"]] = method self.actions.remove(operation_name)
[docs] def _filter_actions_for_resources(self, resources): """ Filters actions based on the given resources. Args: resources (set): A set of resources. Returns: None """ for resource in sorted(resources, key=len, reverse=True): filtered_actions = set( [ a for a in self.actions if a.endswith(resource) or (a.startswith("List") and a.endswith(resource + "s")) or a.startswith("Invoke" + resource) ] ) self.actions_under_resource.update(filtered_actions) self.resource_actions[resource] = filtered_actions self.actions = self.actions - filtered_actions
[docs] def _extract_resources_plan(self): """ Extracts the resource plan from the service JSON. Returns: None """ self.actions = set(self.operations.keys()) log.info(f"Total actions - {len(self.actions)}") # Filter out additional operations and resources first self.resources = set() self._filter_additional_operations() self.create_resources = set( [key[len("Create") :] for key in self.actions if key.startswith("Create")] ) self.add_resources = set( [key[len("Add") :] for key in self.actions if key.startswith("Add")] ) self.start_resources = set( [key[len("Start") :] for key in self.actions if key.startswith("Start")] ) self.register_resources = set( [key[len("Register") :] for key in self.actions if key.startswith("Register")] ) self.import_resources = set( [key[len("Import") :] for key in self.actions if key.startswith("Import")] ) self.resources.update( self.create_resources | self.add_resources | self.start_resources | self.register_resources | self.import_resources ) self._filter_actions_for_resources(self.resources) log.info(f"Total resource - {len(self.resources)}") log.info(f"Supported actions - {len(self.actions_under_resource)}") log.info(f"Unsupported actions - {len(self.actions)}") self._extract_resource_plan_as_dataframe()
[docs] def get_status_chain_and_states(self, resource_name): """ Extract the status chain and states for a given resource. Args: resource_name (str): The name of the resource Returns: status_chain (list): The status chain for the resource. resource_states (list): The states associated with the resource. """ resource_operation = self.operations["Describe" + resource_name] resource_operation_output_shape_name = resource_operation["output"]["shape"] output_members_data = self.shapes[resource_operation_output_shape_name]["members"] if len(output_members_data) == 1: single_member_name = next(iter(output_members_data)) single_member_shape_name = output_members_data[single_member_name]["shape"] status_chain = [] status_chain.append( {"name": single_member_name, "shape_name": single_member_shape_name} ) resource_status_chain, resource_states = self._get_status_chain_and_states( single_member_shape_name, status_chain ) else: resource_status_chain, resource_states = self._get_status_chain_and_states( resource_operation_output_shape_name ) return resource_status_chain, resource_states
[docs] def _get_status_chain_and_states(self, shape_name, status_chain: list = None): """ Recursively extracts the status chain and states for a given shape. Args: shape_name (str): The name of the shape. status_chain (list): The current status chain. Returns: status_chain (list): The status chain for the shape. resource_states (list): The states associated with the shape. """ if status_chain is None: status_chain = [] member_data = self.shapes[shape_name]["members"] status_name = next((member for member in member_data if "status" in member.lower()), None) if status_name is None: return [], [] status_shape_name = member_data[status_name]["shape"] status_chain.append({"name": status_name, "shape_name": status_shape_name}) if "enum" in self.shapes[status_shape_name]: resource_states = self.shapes[status_shape_name]["enum"] return status_chain, resource_states else: status_chain, resource_states = self._get_status_chain_and_states( status_shape_name, status_chain ) return status_chain, resource_states
[docs] def _extract_resource_plan_as_dataframe(self): """ Builds a DataFrame containing resource information. Returns: None """ self.df = pd.DataFrame( columns=[ "resource_name", "type", "class_methods", "object_methods", "chain_resource_name", "additional_methods", "raw_actions", "resource_status_chain", "resource_states", ] ) for resource, actions in sorted(self.resource_actions.items()): class_methods = set() object_methods = set() additional_methods = set() chain_resource_names = set() resource_status_chain = set() resource_states = set() for action in actions: action_low = action.lower() resource_low = resource.lower() if action_low.split(resource_low)[0] == "describe": class_methods.add("get") object_methods.add("refresh") output_shape_name = self.operations[action]["output"]["shape"] output_members_data = self.shapes[output_shape_name]["members"] resource_status_chain, resource_states = self.get_status_chain_and_states( resource ) if resource_low.endswith("job") or resource_low.endswith("jobv2"): object_methods.add("wait") elif resource_states and resource_low != "action": object_methods.add("wait_for_status") if "Deleting" in resource_states or "DELETING" in resource_states: object_methods.add("wait_for_delete") continue if action_low.split(resource_low)[0] == "create": shape_name = self.operations[action]["input"]["shape"] input = self.shapes[shape_name] for member in input["members"]: if member.endswith("Name") or member.endswith("Names"): chain_resource_name = member[: -len("Name")] if ( chain_resource_name != resource and chain_resource_name in self.resources ): chain_resource_names.add(chain_resource_name) action_split = action_low.split(resource_low) if action_split[0] in CLASS_METHODS: if action_low.split(resource_low)[0] == "list": class_methods.add("get_all") else: class_methods.add(action_low.split(resource_low)[0]) elif action_split[0] in OBJECT_METHODS: object_methods.add(action_split[0]) else: additional_methods.add(action) if resource in self.RESOURCE_TO_ADDITIONAL_METHODS: additional_methods.update(self.RESOURCE_TO_ADDITIONAL_METHODS[resource]) new_row = pd.DataFrame( { "resource_name": [resource], "type": ["resource"], "class_methods": [list(sorted(class_methods))], "object_methods": [list(sorted(object_methods))], "chain_resource_name": [list(sorted(chain_resource_names))], "additional_methods": [list(sorted(additional_methods))], "raw_actions": [list(sorted(actions))], "resource_status_chain": [list(resource_status_chain)], "resource_states": [list(resource_states)], } ) self.df = pd.concat([self.df, new_row], ignore_index=True) self.df.to_csv("resource_plan.csv", index=False)
[docs] def get_resource_plan(self): """ Returns the resource plan DataFrame. Returns: df (DataFrame): The resource plan DataFrame. """ return self.df
[docs] def get_resource_methods(self): """ Returns the resource methods dict. Returns: resource_methods (dict): The resource methods dict. """ return self.resource_methods