Source code for openapi_client.amorphic_api_client

#!/usr/bin/env python3
"""
###########################################################################################################
# File: amorphic_api_client.py
# Location: /amorphic_client/amorphic_api_client.py
#
# This module provides the AmorphicApiClient class which extends the base OpenAPI ApiClient
# to provide enhanced functionality for interacting with the Amorphic Data Platform APIs.
# It includes system information retrieval, API version compatibility checking, authentication,
# role management, and comprehensive logging capabilities with optional custom logger support.
#
# Modification History:
# ===================================================================
# Date                 Who                       Description
# ==========      =================     ==============================
#
# Jun 2025        Subir Adhikari        Initial version of Amorphic SDK client
#                                       with logging, version checking, and
#                                       optional custom logger support
#
# Oct 2025        Jeebu Abraham         Added usage stats tracking
###########################################################################################################
"""

import json
import logging
import os
import sys
import time
import functools
import concurrent.futures
import atexit
import pkgutil
from typing import Optional, Any, Dict, List

from packaging.version import parse as version_parse
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from openapi_client.api_client import ApiClient
from openapi_client.configuration import Configuration
from openapi_client.api.management_api import ManagementApi
from openapi_client.models.post_app_usage_body import PostAppUsageBody
from openapi_client.rest import ApiException

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global constant for methods that should be excluded from usage stats tracking
EXCLUDED_METHODS = {
    # Methods that should not be tracked for usage stats
    'update_data_load_throttling_details',
    'revoke_resource_access',
    'generate_payload_for_resource',
    # Methods to exclude from interception to prevent recursion
    'send_app_usage',  # Prevent recursion when sending usage stats stats
    'delete_component_guard_rail_configuration',  # Exclude from usage stats tracking
    '__init__', '__new__', '__del__', '__repr__', '__str__', '__eq__', '__ne__',
    '__lt__', '__le__', '__gt__', '__ge__', '__hash__', '__bool__', '__len__',
    '__getitem__', '__setitem__', '__delitem__', '__iter__', '__next__',
    '__enter__', '__exit__', '__call__', '__getattr__', '__setattr__', '__delattr__',
    '__getattribute__', '__dir__', '__class__', '__module__', '__doc__',
    '__weakref__', '__dict__', '__slots__', '__annotations__', '__qualname__'
}


# def _discover_api_modules():
#     """
#     Dynamically discover all API modules in openapi_client.api package.
#     Returns:
#         List[str]: List of module paths (e.g., ['openapi_client.api.ai_api', ...])
#     """
#     api_module_paths = []
#     base_package = 'openapi_client.api'

#     try:
#         # Import the api package to get its path
#         import openapi_client.api
#         package_path = os.path.dirname(openapi_client.api.__file__)

#         # Discover all modules in the api package
#         for importer, modname, ispkg in pkgutil.iter_modules([package_path]):
#             # Only include modules that end with '_api' (e.g., 'ai_api', 'datasets_api')
#             # Skip packages and __init__ modules
#             if not ispkg and modname != '__init__' and modname.endswith('_api'):
#                 full_module_path = f'{base_package}.{modname}'
#                 api_module_paths.append(full_module_path)

#         logger.debug(f"Discovered {len(api_module_paths)} API modules: {api_module_paths}")
#         return api_module_paths

#     except (ImportError, AttributeError) as e:
#         logger.warning(f"Could not discover API modules dynamically: {e}. Falling back to empty list.")
#         return []

# # Dynamically discover API modules at import time
# # This runs when the module is first imported, so all API classes are ready to be patched
# API_MODULE_PATHS = _discover_api_modules()

# def _create_patched_init(original_init):
#     """
#     Factory function to create a patched __init__ method for API classes.

#     This function uses a closure pattern to create a wrapper around an API class's
#     original __init__ method. The wrapper preserves the original initialization
#     behavior while adding automatic usage stats tracking when an AmorphicApiClient
#     instance is passed as the first argument.

#     How It Works:
#     1. Takes the original __init__ method of an API class (e.g., DatasetsApi.__init__)
#     2. Creates a closure that captures this specific original_init
#     3. Returns a new function that:
#        - First calls the original __init__ to maintain normal initialization
#        - Then checks if the first argument is an AmorphicApiClient
#        - If so, automatically sets up usage stats tracking via _intercept_api_methods()

#     Why Closure is Needed:
#     Each API class (DatasetsApi, AIApi, etc.) has a different __init__ method.
#     The closure ensures that each patched_init function captures and calls
#     its own specific original_init, preventing mix-ups between classes.

#     Args:
#         original_init: The original __init__ method from an API class to be wrapped.
#                        This should be the actual method object, not a string name.

#     Returns:
#         patched_init: A wrapper function with the same signature as the original
#                      __init__ that:
#                      - Calls original_init(self, *args, **kwargs) first
#                      - Then sets up usage stats tracking if AmorphicApiClient detected

#     Example:
#         # During patching (called by _patch_api_classes):
#         original = DatasetsApi.__init__
#         patched = _create_patched_init(original)
#         DatasetsApi.__init__ = patched

#         # When user creates an API instance:
#         client = AmorphicApiClient(...)
#         api = DatasetsApi(client)  # Now automatically tracked!

#         # What happens inside:
#         # 1. patched_init(self=api_instance, args=(client,), kwargs={})
#         # 2. original(self, client) is called → normal DatasetsApi initialization
#         # 3. Checks: args[0] == client and hasattr(client, '_intercept_api_methods')
#         # 4. Calls: client._intercept_api_methods(api_instance)
#         #    → All methods of api_instance now track usage stats

#     Note:
#         - The closure captures original_init at function creation time
#         - Each call to _create_patched_init() creates a unique closure
#         - This ensures DatasetsApi gets its own original_init, AIApi gets its own, etc.
#         - If no AmorphicApiClient is detected, initialization proceeds normally
#           without usage stats tracking (backwards compatible)
#     """
#     # Nested function is necessary for closure - each API class needs its own
#     # patched_init that captures its specific original_init
#     def patched_init(self, *args, **kwargs):
#         # Step 1: Call the original __init__ first to ensure normal initialization
#         # This preserves all the original behavior of the API class
#         original_init(self, *args, **kwargs)
        
#         # Step 2: Check if first argument is an AmorphicApiClient instance
#         # We check args[0] because API classes are typically instantiated as:
#         #   DatasetsApi(amorphic_api_client) where client is the first positional arg
#         if args and hasattr(args[0], '_intercept_api_methods'):
#             # Step 3: Automatically set up usage stats tracking for all methods of this API instance
#             # This replaces all public methods with wrapped versions that track usage
#             args[0]._intercept_api_methods(self)
#             args[0].logger.debug(f"Auto-intercepted API class: {self.__class__.__name__}")

#     return patched_init

# def _patch_api_classes():
#     """
#     Monkey patch API classes to automatically set up usage stats tracking.

#     This function patches the __init__ methods of all discovered API classes
#     (e.g., DatasetsApi, AIApi, ManagementApi) to automatically set up usage
#     stats tracking when they are instantiated with an AmorphicApiClient instance.

#     Key Steps:
#     1. Iterate through all dynamically discovered API module paths
#     2. Extract and convert module names to class names (snake_case to PascalCase)
#     3. Import each API class module
#     4. Patch the __init__ method to call _create_patched_init() wrapper
#     5. When API class is instantiated, the patched __init__ automatically
#        calls _intercept_api_methods() to set up usage stats tracking
#     Example:
#         Before patching:
#             api_instance = DatasetsApi(client)  # No usage tracking

#         After patching:
#             api_instance = DatasetsApi(client)  # Automatically tracks usage
#     Returns:
#         None: Patches are applied in-place to class __init__ methods
#     Raises:
#         None: All exceptions are caught and logged, patching continues for
#               remaining classes even if some fail
#     Note:
#         - Patches are applied once at module import time
#         - Each API class's __init__ is only patched once (checked via _original_init)
#         - If an API class cannot be imported, it is silently skipped
#     """

#     # Iterate through all discovered API modules (e.g., 'openapi_client.api.datasets_api')
#     for module_path in API_MODULE_PATHS:
#         module = None
#         try:
#             # Step 1: Convert module name to class name (snake_case → PascalCase)
#             # Example: 'datasets_api' → 'DatasetsApi'
#             module_name = module_path.split('.')[-1]  # Extract 'datasets_api' from full path
#             class_name = ''.join(word.capitalize() for word in module_name.split('_'))

#             # Step 2: Dynamically import the API class module
#             # This imports the module without executing it at import time
#             # IMPORTANT: Import the module first to ensure it's available even if patching fails
#             # The __import__ call adds the module to sys.modules, making it available for future imports
#             module = __import__(module_path, fromlist=[class_name])

#             # Verify module was imported successfully (it's now in sys.modules)
#             # This ensures the module is available even if patching fails later
#             if module_path not in sys.modules:
#                 logger.warning(f"Module {module_path} was not added to sys.modules after import")

#             # Step 3: Attempt to get the API class and patch it
#             # If this fails, the module is still imported and usable without the patch
#             try:
#                 api_class = getattr(module, class_name)

#                 # Step 4: Check if this class has already been patched
#                 # We store _original_init to prevent patching the same class twice
#                 if not hasattr(api_class, '_original_init'):
#                     # Store the original __init__ method before we replace it
#                     # This allows us to call the original method from the wrapper
#                     api_class._original_init = api_class.__init__

#                     # Step 5: Replace __init__ with our patched version
#                     # The patched version will call the original, then set up usage tracking
#                     api_class.__init__ = _create_patched_init(api_class._original_init)
#             except (AttributeError, TypeError) as patch_error:
#                 # If patching fails for any reason (missing class, attribute error, etc.),
#                 # log a warning but continue - the module is still imported and usable
#                 logger.warning(
#                     f"Failed to patch {class_name} from {module_path}: {patch_error}. "
#                     f"Module will be available without patching."
#                 )
#                 continue

#         except ImportError:
#             # If an API class can't be imported (maybe it doesn't exist in this SDK version),
#             # silently skip it and continue patching other classes
#             continue
#         except Exception as e:
#             # Catch any other unexpected exceptions during module import or processing
#             # Log a warning but ensure we don't break the entire patching process
#             logger.warning(
#                 f"Unexpected error processing {module_path}: {e}. "
#                 f"Module may not be patched but will be available if imported."
#             )
#             continue

# # Apply patches when module is imported
# # This runs once when amorphic_api_client.py is first imported by any script
# _patch_api_classes()


[docs] class TokenFetcher: """Utility class to fetch PAT tokens from various sources for local development"""
[docs] def __init__(self, aws_profile: Optional[str] = None, logger: Optional[logging.Logger] = None): """ Initialize TokenFetcher for retrieving authentication tokens from various sources. Key Steps: 1. Store AWS profile name for local development 2. Set up logger instance for consistent logging 3. Prepare for token fetching from multiple sources Args: aws_profile (Optional[str]): AWS profile name for local development logger (Optional[logging.Logger]): Logger instance for consistent logging Returns: TokenFetcher: Initialized token fetcher instance Raises: None: This method has no external dependencies """ self.aws_profile = aws_profile self.logger = logger or logging.getLogger(__name__)
def _get_boto3_client(self, service_name: str) -> Any: """ Get boto3 client with appropriate configuration for AWS services. Key Steps: 1. Check if AWS profile is specified for local development 2. Create boto3 session with profile or default credentials 3. Create client for the specified AWS service 4. Handle credential errors gracefully Args: service_name (str): AWS service name (e.g., 'ssm', 'secretsmanager') Returns: boto3 client instance: Configured client for the specified AWS service Raises: NoCredentialsError: If AWS credentials are not configured """ try: if self.aws_profile: self.logger.info("Creating %s client with profile: %s", service_name, self.aws_profile) session = boto3.Session(profile_name=self.aws_profile) return session.client(service_name) self.logger.info("Creating %s client with default credentials", service_name) return boto3.client(service_name) except NoCredentialsError as e: self.logger.error("AWS credentials not configured. Please run 'aws configure' or set environment variables") raise NoCredentialsError( "AWS credentials not configured. Please run 'aws configure' or set environment variables" ) from e
[docs] def fetch_token_from_env(self, env_var_name: str) -> Optional[str]: """ Fetch token from environment variable. Key Steps: 1. Retrieve environment variable value using os.getenv 2. Log success if token is found 3. Log warning if environment variable is not found 4. Return token or None Args: env_var_name (str): Environment variable name containing the token Returns: Optional[str]: Token if found, None otherwise Raises: None: This method handles all cases gracefully """ token = os.getenv(env_var_name) if token: self.logger.info("Token retrieved from environment variable: %s", env_var_name) return token self.logger.warning("Environment variable %s not found", env_var_name) return None
[docs] def fetch_token_from_ssm(self, parameter_name: str, decrypt: bool = True) -> Optional[str]: """ Fetch token from AWS SSM Parameter Store. Key Steps: 1. Create SSM client using configured AWS credentials 2. Call get_parameter with decryption option 3. Extract token value from response 4. Handle various error conditions (not found, access denied, etc.) 5. Log success or appropriate error messages Args: parameter_name (str): SSM parameter name containing the token decrypt (bool): Whether to decrypt SecureString parameters (default: True) Returns: Optional[str]: Token if found, None otherwise Raises: None: This method handles all exceptions internally """ try: self.logger.debug("Attempting to fetch token from SSM parameter: %s", parameter_name) ssm_client = self._get_boto3_client('ssm') response = ssm_client.get_parameter( Name=parameter_name, WithDecryption=decrypt ) token = response['Parameter']['Value'] self.logger.info("Successfully retrieved token from SSM parameter: %s", parameter_name) return token except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'ParameterNotFound': self.logger.warning("SSM parameter not found: %s", parameter_name) elif error_code == 'AccessDenied': self.logger.error("Access denied to SSM parameter: %s", parameter_name) else: self.logger.error("Error retrieving SSM parameter %s: %s", parameter_name, e) return None except Exception as e: self.logger.error("Unexpected error retrieving SSM parameter %s: %s", parameter_name, e) return None
[docs] def fetch_token_from_secrets_manager(self, secret_arn: str) -> Optional[str]: """ Fetch token from AWS Secrets Manager. Key Steps: 1. Create Secrets Manager client using configured AWS credentials 2. Call get_secret_value with the provided secret ARN 3. Extract SecretString value from response 4. Handle various error conditions (not found, access denied, etc.) 5. Log success or appropriate error messages Args: secret_arn (str): Secret ARN or name containing the token Returns: Optional[str]: Token if found, None otherwise Raises: None: This method handles all exceptions internally """ try: self.logger.debug("Attempting to fetch token from Secrets Manager: %s", secret_arn) secrets_client = self._get_boto3_client('secretsmanager') response = secrets_client.get_secret_value(SecretId=secret_arn) if 'SecretString' in response: secret_value = response['SecretString'] self.logger.info("Successfully retrieved token from Secrets Manager: %s", secret_arn) return secret_value self.logger.error("Secret does not contain string value: %s", secret_arn) return None except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'ResourceNotFoundException': self.logger.warning("Secret not found: %s", secret_arn) elif error_code == 'AccessDenied': self.logger.error("Access denied to secret: %s", secret_arn) else: self.logger.error("Error retrieving secret %s: %s", secret_arn, e) return None except Exception as e: self.logger.error("Unexpected error retrieving secret %s: %s", secret_arn, e) return None
[docs] def fetch_token(self, env_var: Optional[str] = None, ssm_parameter: Optional[str] = None, secret_arn: Optional[str] = None) -> str: """ Fetch token from the specified source with priority order. Key Steps: 1. Check environment variable source first (highest priority) 2. Check SSM parameter source second 3. Check Secrets Manager source third 4. Raise ValueError if no source is specified 5. Return the token from the first available source Args: env_var (Optional[str]): Environment variable name containing the token ssm_parameter (Optional[str]): SSM parameter name containing the token secret_arn (Optional[str]): Secrets Manager secret ARN containing the token Returns: str: Token from the specified source Raises: ValueError: If no token source is specified NoCredentialsError: If AWS credentials are not configured for SSM/Secrets Manager ClientError: If AWS service calls fail """ self.logger.debug("Attempting to fetch token from configured sources") if env_var: self.logger.debug("Fetching token from environment variable: %s", env_var) return self.fetch_token_from_env(env_var) if ssm_parameter: self.logger.debug("Fetching token from SSM parameter: %s", ssm_parameter) return self.fetch_token_from_ssm(ssm_parameter) if secret_arn: self.logger.debug("Fetching token from Secrets Manager: %s", secret_arn) return self.fetch_token_from_secrets_manager(secret_arn) self.logger.error("No token source specified") raise ValueError("No token source specified")
[docs] class AmorphicApiClient(ApiClient): """Custom API client for Amorphic API that extends the base ApiClient""" # Shared ThreadPoolExecutor for async usage stats operations # This is a class variable (shared across all instances) because: # 1. We want one thread pool for all usage stats, not one per client instance # 2. More efficient resource usage - don't create new thread pools unnecessarily # 3. max_workers=2 allows 2 concurrent stats sends without blocking _executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) # Track if we've registered the shutdown hook (only need to register once globally) # This ensures all pending usage stats are sent before the program exits _executor_shutdown_registered = False
[docs] def __init__(self, configuration: Configuration, role_id: str, custom_logger: Optional[logging.Logger] = None, enable_audit_stats: bool = False): """ Initialize the Amorphic API client with the given configuration and role ID. Key Steps: 1. Initialize logger (custom or default) 2. Set up usage statistics collection if enabled 3. Create management API instance with optional usage stats tracking 4. Check version compatibility with the Amorphic API 5. Log system information if debug mode is enabled Args: configuration (Configuration): The configuration object containing: - host: API host URL - api_key: Authentication key dictionary - ssl_ca_cert: SSL certificate path (optional) - debug: Enable debug logging role_id (str): The role ID for authentication and API calls custom_logger (Optional[logging.Logger]): Custom logger instance. If not provided, a default logger will be created with console output. enable_audit_stats (bool): Enable automatic usage statistics collection for product team analytics. Defaults to True, can be set to False to disable usage stats. Returns: AmorphicApiClient: Initialized client instance ready for API calls Raises: ValueError: If the API version is not compatible with the required version range Exception: If system information cannot be retrieved during initialization """ # Initialize logger - use provided logger or create default one if custom_logger is not None: self.logger = custom_logger # Set log level on provided logger if debug is enabled if configuration.debug and self.logger.level > logging.DEBUG: self.logger.setLevel(logging.DEBUG) else: # Create default logger if none provided self.logger = logging.getLogger(__name__) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) # Set log level based on debug configuration if configuration.debug: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.INFO) # Log configuration values only when debug is True if configuration.debug: self._log_configuration(configuration) super().__init__(configuration) self.role_id = role_id # Initialize usage stats collection self.enable_audit_stats = enable_audit_stats self.audit_stats = { 'method_calls': {}, 'total_operations': 0 } self.operation_history = [] # Load SDK version once during initialization self.sdk_version = self._get_sdk_version() # Create management_api with usage stats tracking self.management_api = ManagementApi(self) if self.enable_audit_stats: self._intercept_api_methods(self.management_api) try: self._check_version_compatibility() except ValueError: raise # Re-raise version compatibility errors except Exception as e: self.logger.warning("Could not get system information during initialization: %s", e)
def _log_configuration(self, configuration: Configuration) -> None: """ Log configuration values when debug mode is enabled. Key Steps: 1. Extract configuration values from the Configuration object 2. Sanitize sensitive information (API keys) 3. Test JSON serialization for each value 4. Log the sanitized configuration dictionary Args: configuration (Configuration): The configuration object to log Returns: None Raises: None: This method handles all exceptions internally and continues execution """ config_dict = {} for k, v in configuration.__dict__.items(): if v is not None and k not in ['logger', 'logger_file_handler', 'debug']: try: # Test if value can be serialized if isinstance(v, dict): v_copy = v.copy() if 'api_key' in v_copy and isinstance(v_copy['api_key'], dict): v_copy['api_key']['LambdaAuthorizer'] = '********' json.dumps(v_copy) # Test serialization config_dict[k] = v_copy else: json.dumps({k: v}) # Test serialization config_dict[k] = v except (TypeError, ValueError): continue self.logger.debug("Configuration passed to the client: %s", json.dumps(config_dict, indent=2)) def _check_version_compatibility(self) -> None: """ Check version compatibility between the SDK and the Amorphic API. Key Steps: 1. Load version configuration from version_config.json 2. Find the current SDK version configuration 3. Retrieve system information from the API 4. Compare API version against minimum and maximum supported versions 5. Log system information if compatibility check passes Args: None Returns: None Raises: ValueError: If no current SDK version is found in configuration ValueError: If API version is below minimum required version ValueError: If API version is above maximum allowed version Exception: If system information cannot be retrieved """ # Load version configuration config_path = os.path.join(os.path.dirname(__file__), 'version_config.json') with open(config_path, 'r', encoding='utf-8') as f: version_config = json.load(f) # Find current SDK version configuration current_version_config = None for config in version_config.values(): if config.get('is_current', False): current_version_config = config break if not current_version_config: raise ValueError("No current SDK version found in version configuration") min_version = current_version_config.get('min_amorphic_version') max_version = current_version_config.get('max_amorphic_version') # Get system information system_info = self.management_api.get_system_information(role_id=self.role_id) current_version = system_info.version # Check version compatibility if min_version and version_parse(current_version) < version_parse(min_version): raise ValueError( f"API version {current_version} is below minimum required version {min_version}" ) if max_version and version_parse(current_version) > version_parse(max_version): raise ValueError( f"API version {current_version} is above maximum allowed version {max_version}" ) self._log_system_info(system_info) def _log_system_info(self, system_info: Any) -> None: """ Log system information retrieved from the Amorphic API. Key Steps: 1. Format and display system version information 2. Log environment details (environment, project name, etc.) 3. Display AWS configuration details (region, account ID) 4. Add visual separators for readability Args: system_info (Any): System information object containing: - version: API version - environment: Environment name - project_name: Project name - project_shortname: Project shortname - aws_region: AWS region - aws_account_id: AWS account ID Returns: None Raises: None: This method handles all exceptions internally """ self.logger.info("\nSystem Information:") self.logger.info("=" * 50) self.logger.info("Version: %s", system_info.version) self.logger.info("Environment: %s", system_info.environment) self.logger.info("Project Name: %s", system_info.project_name) self.logger.info("Project Shortname: %s", system_info.project_shortname) self.logger.info("AWS Region: %s", system_info.aws_region) self.logger.info("AWS Account ID: %s", system_info.aws_account_id) self.logger.info("-" * 50) def _get_sdk_version(self): """ Load version configuration and return the current SDK version. Key Steps: 1. Load version_config.json from the version directory 2. Parse JSON configuration 3. Find the version marked as 'is_current': True 4. Return version string or "Unknown" if not found Args: None Returns: str: The version string that is marked as current, or "Unknown" if not found Raises: None: This method handles all exceptions internally and returns "Unknown" on failure """ sdk_version = "Unknown" try: # Get the path to version_config.json in the version directory config_path = os.path.join(os.path.dirname(__file__), 'version_config.json') with open(config_path, 'r', encoding='utf-8') as f: version_config = json.load(f) # Find current SDK version configuration for version, config in version_config.items(): if config.get('is_current', False): sdk_version = version break return sdk_version except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: self.logger.warning("Failed to load SDK version: %s", str(e)) return sdk_version
[docs] @classmethod def create_with_auth(cls, host: str, role_id: str, ssl_ca_cert: Optional[str] = None, debug: bool = False, aws_profile: Optional[str] = None, env_var: Optional[str] = None, ssm_parameter: Optional[str] = None, secret_arn: Optional[str] = None, token: Optional[str] = None, custom_logger: Optional[logging.Logger] = None) -> 'AmorphicApiClient': """ Factory method to create AmorphicApiClient with flexible authentication. Key Steps: 1. Fetch authentication token from specified source (if not provided directly) 2. Create Configuration object with host, token, and SSL settings 3. Initialize AmorphicApiClient with the configuration 4. Return fully configured client instance Args: host (str): API host URL (e.g., 'https://api.amorphic.com') role_id (str): Role ID for authentication and API calls ssl_ca_cert (Optional[str]): SSL CA certificate path for custom certificates debug (bool): Enable debug mode for detailed logging aws_profile (Optional[str]): AWS profile name for local development env_var (Optional[str]): Environment variable name containing token ssm_parameter (Optional[str]): SSM parameter name containing token secret_arn (Optional[str]): Secrets Manager secret ARN containing token token (Optional[str]): Direct token value (bypasses token fetching) custom_logger (Optional[logging.Logger]): Custom logger instance Returns: AmorphicApiClient: Configured client instance ready for API calls Raises: ValueError: If no token source is configured and no direct token provided NoCredentialsError: If AWS credentials are not configured for SSM/Secrets Manager ClientError: If AWS service calls fail (SSM/Secrets Manager) """ if not token: token_fetcher = TokenFetcher(aws_profile=aws_profile, logger=custom_logger) token = token_fetcher.fetch_token( env_var=env_var, ssm_parameter=ssm_parameter, secret_arn=secret_arn ) configuration = Configuration( host=host, api_key={'LambdaAuthorizer': token}, ssl_ca_cert=ssl_ca_cert, debug=debug ) return cls(configuration=configuration, role_id=role_id, custom_logger=custom_logger)
def _send_audit_stats(self, method_name: str, success: bool = True) -> None: """ Automatically send usage statistics to the product team asynchronously. Key Steps: 1. Check if usage stats are enabled 2. Prepare usage data with method name as action 3. Submit async task to ThreadPoolExecutor 4. Register shutdown hook for graceful cleanup Args: method_name (str): The API method name that was called (e.g., 'create_domain') success (bool): Whether the operation was successful (default: True) Returns: None Raises: None: This method handles all exceptions internally to prevent breaking customer operations """ if not self.enable_audit_stats: return # Prepare the usage stats payload # Format: {"Action": "create_domain", "AdditionalInfo": {"SDKVersion": "3.2"}} audit_data = { "Action": method_name, "AdditionalInfo": {"SDKVersion": self.sdk_version} } # Submit the async task to ThreadPoolExecutor # This runs in a background thread so it doesn't block the main operation # The future object is returned but we don't wait for it - fire and forget future = self._executor.submit(self._send_audit_stats_sync, audit_data) # Register shutdown hook to ensure threads complete before program exits # This only needs to be registered once (first time this method is called) # The shutdown hook waits for all pending threads to finish gracefully if not self._executor_shutdown_registered: atexit.register(self._executor.shutdown, wait=True) self._executor_shutdown_registered = True def _send_audit_stats_sync(self, audit_data): """ Synchronous method to send usage stats (called by ThreadPoolExecutor). Args: audit_data (dict): The audit data to send """ try: post_app_usage_body = PostAppUsageBody.from_dict(audit_data) self.management_api.send_app_usage( role_id=self.role_id, usage_metrics_for="sdk", post_app_usage_body=post_app_usage_body, _headers={"X-Source": "sdk"} ) self.logger.debug(f"Async usage stats sent: {audit_data['Action']}") except Exception as e: self.logger.warning(f"Failed async usage stats send: {str(e)}") def _track_resource_operation(self, method_name: str, success: bool = True) -> None: """ Track successful resource operations and send usage stats. Key Steps: 1. Check if usage stats are enabled 2. Skip tracking for failed operations 3. Skip excluded methods 4. Update local usage statistics 5. Store operation in history 6. Send usage stats to product team Args: method_name (str): The API method name that was called success (bool): Whether the operation was successful (default: True) Returns: None Raises: None: This method handles all exceptions internally """ if not self.enable_audit_stats: return # Filter 1: Only track successful operations (failures don't count for usage stats) # This ensures we only report actual successful API usage if not success: self.logger.debug(f"Skipping usage stats for failed operation: {method_name}") return # Filter 2: Skip tracking for excluded methods (internal methods, special cases) # Examples: 'send_app_usage' (to prevent recursion), '__init__', etc. if method_name.lower() in EXCLUDED_METHODS: self.logger.debug(f"Skipping usage stats for excluded method: {method_name}") return # Step 1: Update local in-memory statistics # This keeps track of how many times each method was called if 'method_calls' not in self.audit_stats: self.audit_stats['method_calls'] = {} if method_name not in self.audit_stats['method_calls']: self.audit_stats['method_calls'][method_name] = 0 self.audit_stats['method_calls'][method_name] += 1 self.audit_stats['total_operations'] += 1 # Step 2: Store operation in history for detailed tracking # This includes timestamp so we can see when operations occurred self.operation_history.append({ 'timestamp': time.time(), 'method': method_name, 'success': True, # Always True since we only track successful operations }) # Step 3: Send usage stats to backend asynchronously # This happens in the background so it doesn't slow down the user's operation self._send_audit_stats(method_name, success=True)
[docs] def get_audit_stats(self) -> Dict[str, Any]: """ Get comprehensive usage statistics for the current session. Key Steps: 1. Check if usage stats are enabled 2. Return usage statistics including total operations and method call counts 3. Include operation history with timestamps and details 4. Return disabled message if usage stats are not enabled Args: None Returns: Dict[str, Any]: Dictionary containing: - total_operations: Total number of operations performed - method_calls: Dictionary of method calls with their counts - operation_history: List of operation details with timestamps OR - message: 'Usage statistics are disabled' if disabled Raises: None: This method has no external dependencies """ if not self.enable_audit_stats: return {'message': 'Usage statistics are disabled'} return { 'total_operations': self.audit_stats['total_operations'], 'method_calls': self.audit_stats['method_calls'], 'operation_history': self.operation_history }
[docs] def get_audit_summary(self) -> str: """ Get human-readable usage stats summary for the current session. Key Steps: 1. Check if usage stats are enabled 2. Build summary with total operations count 3. Add breakdown by method calls with their counts 4. Return formatted string summary Args: None Returns: str: Human-readable summary of usage statistics or "Usage statistics are disabled" Raises: None: This method has no external dependencies """ if not self.enable_audit_stats: return "Usage statistics are disabled" summary = [] summary.append(f"Total Operations: {self.audit_stats['total_operations']}") method_calls = self.audit_stats.get('method_calls', {}) if method_calls: summary.append("Method Calls:") for method_name, count in method_calls.items(): summary.append(f" - {method_name}: {count}") return "\n".join(summary)
def _intercept_api_call(self, api_instance, method_name: str, *args, **kwargs): """ Intercept API calls to automatically track resource operations. Key Steps: 1. Get the original method from stored methods to prevent recursion 2. Execute the original API call with provided arguments 3. Track successful operations for usage statistics 4. Handle and re-raise any exceptions from the original call 5. Log failed operations without sending usage stats Args: api_instance: The API instance containing the method method_name (str): Name of the method being called *args: Positional arguments for the method call **kwargs: Keyword arguments for the method call Returns: Any: Result from the original API method call Raises: Exception: Any exception raised by the original API method call """ try: # Step 1: Get the original method from our stored cache # We store original methods to prevent infinite recursion if the method # tries to call itself or if we intercept it multiple times if hasattr(api_instance, '_original_methods') and method_name in api_instance._original_methods: method = api_instance._original_methods[method_name] else: # Fallback: if original not cached, get it directly (shouldn't happen normally) method = getattr(api_instance, method_name) # Step 2: Execute the actual API call with the original method # This is the real API operation (e.g., create_domain, list_agents, etc.) result = method(*args, **kwargs) # Step 3: If the call succeeded, track it for usage statistics # Failed calls are caught below and NOT tracked self._track_resource_operation(method_name, success=True) return result except Exception as e: # Step 4: If the API call failed, re-raise the exception # We do NOT track failed operations - only successful ones count for usage stats # This ensures users aren't penalized for errors or validation failures self.logger.debug(f"API call failed (no usage stats sent): {method_name} - {str(e)}") raise e
[docs] def get_system_information(self, role_id: str) -> Any: """ Get system information from the Amorphic API. Key Steps: 1. Validate that role_id is provided and not empty 2. Call the management API to retrieve system information 3. Return system information including version, environment, project details 4. Handle and re-raise any API exceptions Args: role_id (str): The role ID for authentication. Must be provided and non-empty. Returns: SystemInformation: The system information response containing: - version: API version - environment: Environment name - project_name: Project name - project_shortname: Project shortname - aws_region: AWS region - aws_account_id: AWS account ID Raises: ValueError: If role_id is None or empty ApiException: If the API call fails """ if not role_id: raise ValueError("role_id cannot be null or empty") try: return self.management_api.get_system_information(role_id) except ApiException as e: self.logger.error("Exception when calling ManagementApi->get_system_information: %s", e) raise
def _intercept_api_methods(self, api_instance): """ Intercept all public methods of an API instance for usage tracking. Key Steps: 1. Get list of excluded methods that should not be tracked 2. Store original methods to prevent recursion 3. Iterate through all public methods of the API instance 4. Create intercepted methods that call the original methods 5. Replace original methods with intercepted versions 6. Set up usage tracking for successful operations only Args: api_instance: The API instance to intercept methods for Returns: None Raises: None: This method handles all exceptions internally """ # Step 1: Create a cache to store original methods # This prevents infinite recursion if a method tries to call itself # We store them on the instance so each API instance has its own cache if not hasattr(api_instance, '_original_methods'): api_instance._original_methods = {} # Step 2: Iterate through all attributes of the API instance # dir() gives us all methods, properties, etc. for attr_name in dir(api_instance): # Step 3: Filter to only intercept public, callable methods # - Skip private methods (starting with '_') like __init__, __str__, etc. # - Only intercept callable objects (actual methods, not properties) # - Skip excluded methods (send_app_usage, etc.) to prevent recursion if (not attr_name.startswith('_') and callable(getattr(api_instance, attr_name)) and attr_name not in EXCLUDED_METHODS): # Step 4: Store the original method in cache if not already stored # This ensures we have a reference to the unwrapped method if attr_name not in api_instance._original_methods: api_instance._original_methods[attr_name] = getattr(api_instance, attr_name) original_method = api_instance._original_methods[attr_name] # Step 5: Create a wrapper function for this specific method # This closure captures both method_name and original_method def create_intercepted_method(method_name: str, original_method): # Use functools.wraps to preserve method metadata (name, docstring, etc.) @functools.wraps(original_method) def intercepted_method(*args, **kwargs): # When the method is called, redirect to our interceptor # The interceptor will call the original, then track usage return self._intercept_api_call(api_instance, method_name, *args, **kwargs) return intercepted_method # Step 6: Replace the original method with our intercepted version # Now when user calls api.create_domain(), it goes through our wrapper setattr(api_instance, attr_name, create_intercepted_method(attr_name, original_method))