gen_ai_hub.orchestration.service
index
/home/jenkins/agent/workspace/ation_generative-ai-hub-sdk_main/gen_ai_hub/orchestration/service.py

Module for orchestration service handling requests and responses.
 
Provides synchronous and asynchronous methods to run orchestration pipelines.

 
Modules
       
asyncio
dacite
httpx
logging
random
time

 
Classes
       
builtins.object
OrchestrationService
gen_ai_hub.orchestration.models.base.JSONSerializable(abc.ABC)
OrchestrationRequest

 
class OrchestrationRequest(gen_ai_hub.orchestration.models.base.JSONSerializable)
    OrchestrationRequest(config: gen_ai_hub.orchestration.models.config.OrchestrationConfig, template_values: List[gen_ai_hub.orchestration.models.template.TemplateValue], history: List[gen_ai_hub.orchestration.models.message.Message]) -> None
 
Represents a request for the orchestration process, including configuration,
template values, and message history.
 
 
Method resolution order:
OrchestrationRequest
gen_ai_hub.orchestration.models.base.JSONSerializable
abc.ABC
builtins.object

Methods defined here:
__eq__(self, other)
Return self==value.
__init__(self, config: gen_ai_hub.orchestration.models.config.OrchestrationConfig, template_values: List[gen_ai_hub.orchestration.models.template.TemplateValue], history: List[gen_ai_hub.orchestration.models.message.Message]) -> None
Initialize self.  See help(type(self)) for accurate signature.
__repr__(self)
Return repr(self).
to_dict(self)
Converts the OrchestrationRequest instance to a dictionary.
 
:return: Dictionary representation of the OrchestrationRequest
:rtype: dict

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {'config': <class 'gen_ai_hub.orchestration.models.config.OrchestrationConfig'>, 'history': typing.List[gen_ai_hub.orchestration.models.message.Message], 'template_values': typing.List[gen_ai_hub.orchestration.models.template.TemplateValue]}
__dataclass_fields__ = {'config': Field(name='config',type=<class 'gen_ai_hub.orch...appingproxy({}),kw_only=False,_field_type=_FIELD), 'history': Field(name='history',type=typing.List[gen_ai_hub...appingproxy({}),kw_only=False,_field_type=_FIELD), 'template_values': Field(name='template_values',type=typing.List[ge...appingproxy({}),kw_only=False,_field_type=_FIELD)}
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)
__hash__ = None
__match_args__ = ('config', 'template_values', 'history')

Data descriptors inherited from gen_ai_hub.orchestration.models.base.JSONSerializable:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class OrchestrationService(builtins.object)
    OrchestrationService(api_url: Optional[str] = None, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, proxy_client: Optional[gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient] = None, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None)
 
A service for executing orchestration requests, allowing for the generation of 
LLM-generated content through a pipeline of configured modules. This service supports both synchronous and
asynchronous request execution. For streaming responses, special care is taken to not close the underlying 
HTTP stream prematurely.
 
https://api.sap.com/api/ORCHESTRATION_API/overview
 
  Methods defined here:
__init__(self, api_url: Optional[str] = None, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, proxy_client: Optional[gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient] = None, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None)
Initializes the OrchestrationService with the provided parameters.
 
:param api_url: The base URL for the orchestration API, defaults to None
:type api_url: Optional[str], optional
:param config: The default orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param proxy_client: The GenAIHubProxyClient instance, defaults to None
:type proxy_client: Optional[GenAIHubProxyClient], optional
:param deployment_id: the deployment ID, defaults to None
:type deployment_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:param timeout: the timeout for HTTP requests, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
async aclose_http_connection(self)
Closes the httpx asynchronous client.
async arun(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration.models.response.OrchestrationResponse
Executes an orchestration request asynchronously (non-streaming).
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: An OrchestrationResponse object.
:rtype: OrchestrationResponse
async arun_with_retries(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None, max_retries: int = 10, base_delay: float = 1.0) -> gen_ai_hub.orchestration.models.response.OrchestrationResponseWithRetries | None
Executes an orchestration request asynchronously with automatic retry on rate limits (429) and 
server errors. Uses exponential backoff with jitter to handle rate limiting gracefully.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:param max_retries: the maximum number of retry attempts, defaults to 10
:type max_retries: int, optional
:param base_delay: the initial delay between retries in seconds, defaults to 1.0
:type base_delay: float, optional
:return: An OrchestrationResponseWithRetries with retry count information.
:rtype: OrchestrationResponseWithRetries | None
:raises OrchestrationError: If the request fails after all retries (includes retry count).
:raises ValueError: If no configuration is provided.
async astream(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, stream_options: Optional[dict] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration.sse_client.AsyncSSEClient
Executes an orchestration request asynchronously in streaming mode.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param stream_options: the additional streaming options, defaults to None
:type stream_options: Optional[dict], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: An AsyncSSEClient instance for iterating over the streaming response.
:rtype: AsyncSSEClient
close_http_connection(self)
Closes the httpx synchronous client.
handle_retry(self, retry_count: int, base_delay: float, error: gen_ai_hub.orchestration.exceptions.OrchestrationError, max_retries: int) -> float
Handles retry logic with exponential backoff and jitter.
If Retry-After header exists, use it as min_delay to add jitter on top
 
:param retry_count: the current retry attempt number
:type retry_count: int
:param base_delay: the initial delay between retries in seconds
:type base_delay: float
:param error: the exception that occurred
:type error: OrchestrationError
:param max_retries: the maximum number of retry attempts
:type max_retries: int
:raises error: Raises the original error if no more retries should be attempted
:return: number of seconds to wait before next retry
:rtype: float
run(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration.models.response.OrchestrationResponse
Executes an orchestration request synchronously (non-streaming).
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: An OrchestrationResponse object.
:rtype: OrchestrationResponse
run_with_retries(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None, max_retries: int = 10, base_delay: float = 1.0) -> gen_ai_hub.orchestration.models.response.OrchestrationResponseWithRetries | None
Executes an orchestration request with automatic retry on rate limits (429) and server errors.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:param max_retries: the maximum number of retry attempts, defaults to 10
:type max_retries: int, optional
:param base_delay: the initial delay between retries in seconds, defaults to 1.0
:type base_delay: float, optional
:return: An OrchestrationResponseWithRetries with retry count information.
:rtype: OrchestrationResponseWithRetries | None
:raises OrchestrationError: If the request fails after all retries (includes retry count).
:raises ValueError: If no configuration is provided.
stream(self, config: Optional[gen_ai_hub.orchestration.models.config.OrchestrationConfig] = None, template_values: Optional[List[gen_ai_hub.orchestration.models.template.TemplateValue]] = None, history: Optional[List[gen_ai_hub.orchestration.models.message.Message]] = None, stream_options: Optional[dict] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration.sse_client.SSEClient
Executes an orchestration request in streaming mode (synchronously).
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param template_values: the template values for the request, defaults to None
:type template_values: Optional[List[TemplateValue]], optional
:param history: the message history, defaults to None
:type history: Optional[List[Message]], optional
:param stream_options: the additional streaming options, defaults to None
:type stream_options: Optional[dict], optional
:param timeout: the timeout for the request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: An SSEClient instance for iterating over the streaming response.
:rtype: SSEClient

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
Functions
       
cache_if_not_none(func)
Custom cache decorator that only caches non-None results
 
:param func: The function to be decorated.
:type func: callable
:return: The decorated function with caching behavior.
:rtype: callable
discover_orchestration_api_url(base_url: str, auth_url: str, client_id: str, client_secret: str, resource_group: str, config_id: Optional[str] = None, config_name: Optional[str] = None, orchestration_scenario: str = 'orchestration', executable_id: str = 'orchestration') -> Optional[str]
Discovers the orchestration API URL based on provided configuration details.
 
:param base_url: the base URL for the AI Core API.
:type base_url: str
:param auth_url: the URL for the AI Core authentication service.
:type auth_url: str
:param client_id: the client ID for the AI Core API.
:type client_id: str
:param client_secret: the client secret for the AI Core API.
:type client_secret: str
:param resource_group: the resource group for the AI Core API.
:type resource_group: str
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param orchestration_scenario: the orchestration scenario ID, defaults to "orchestration"
:type orchestration_scenario: str, optional
:param executable_id: the orchestration executable ID, defaults to "orchestration"
:type executable_id: str, optional
:return: The orchestration API URL or None if no deployment is found.
:rtype: Optional[str]
get_orchestration_api_url(proxy_client: gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None) -> str
Retrieves the orchestration API URL based on provided deployment or configuration details.
 
:param proxy_client: The GenAIHubProxyClient instance.
:type proxy_client: GenAIHubProxyClient
:param deployment_id: the deployment ID, defaults to None
:type deployment_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:raises ValueError: If no orchestration deployment is found.
:return: The orchestration API URL.
:rtype: str

 
Data
        COMPLETION_SUFFIX = '/completion'
Iterable = typing.Iterable
List = typing.List
Optional = typing.Optional
Union = typing.Union