gen_ai_hub.orchestration_v2.service
index
/home/jenkins/agent/workspace/ation_generative-ai-hub-sdk_main/gen_ai_hub/orchestration_v2/service.py

Module for orchestration service handling requests and responses.
 
Provides synchronous and asynchronous methods to run orchestration pipelines.

 
Modules
       
asyncio
httpx
logging
random
time

 
Classes
       
builtins.object
OrchestrationService

 
class OrchestrationService(builtins.object)
    OrchestrationService(api_url: Optional[str] = None, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, proxy_client: Optional[gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient] = None, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None)
 
A service for executing orchestration requests, allowing for the generation of LLM-generated content
through a pipeline of configured modules.
 
This service supports both synchronous and asynchronous request execution. For streaming responses,
special care is taken to not close the underlying HTTP stream prematurely.
 
See https://api.sap.com/api/ORCHESTRATION_API_v2/overview
 
Args:
 
api_url: The base URL for the orchestration API.
 
config: The default orchestration configuration.
 
config_ref: The reference to default orchestration configuration.
 
proxy_client: A GenAIHubProxyClient instance.
 
deployment_id: Optional deployment ID.
 
config_name: Optional configuration name.
 
config_id: Optional configuration ID.
 
timeout: Optional timeout for HTTP requests.
 
  Methods defined here:
__init__(self, api_url: Optional[str] = None, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, proxy_client: Optional[gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient] = None, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None)
Initializes the OrchestrationService.
 
:param api_url: the base URL for the orchestration API, defaults to None
:type api_url: Optional[str], optional
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
:param proxy_client: the GenAIHubProxyClient instance, defaults to None
:type proxy_client: Optional[GenAIHubProxyClient], optional
:param deployment_id: the deployment ID, defaults to None
:type deployment_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:param timeout: the timeout for HTTP requests, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:raises ValueError: if both config and config_ref are provided.
async aclose_http_connection(self)
Closes the httpx asynchronous client.
async aembed(self, config: gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsOrchestrationConfig, input: gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsInput, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsPostResponse
Executes an embeddings request asynchronously.
 
:param config: the embeddings orchestration configuration
:type config: EmbeddingsOrchestrationConfig
:param input: the input text to embed
:type input: EmbeddingsInput
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: the EmbeddingsPostResponse object
:rtype: EmbeddingsPostResponse
async arun(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration_v2.models.response.CompletionPostResponse
Executes an orchestration request asynchronously (non-streaming).
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: the CompletionPostResponse object
:rtype: CompletionPostResponse
async arun_with_retries(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None, max_retries: int = 10, base_delay: float = 1.0) -> gen_ai_hub.orchestration_v2.models.response.OrchestrationResponseWithRetries | None
Executes an orchestration request asynchronously with automatic retry on rate limits (429) and server errors.
Uses exponential backoff with jitter to handle rate limiting gracefully.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:param max_retries: the maximum number of retry attempts, defaults to 10
:type max_retries: int, optional
:param base_delay: the initial delay between retries in seconds, defaults to 1.0
:type base_delay: float, optional
:return: the OrchestrationResponseWithRetries with retry count information
:rtype: OrchestrationResponseWithRetries | None
:raises ValueError: if no configuration is provided.
:raises OrchestrationError: if request fails after all retries (includes retry count).
async astream(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration_v2.sse_client.AsyncSSEClient
Executes an orchestration streaming request asynchronously.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: the AsyncSSEClient object
:rtype: AsyncSSEClient
close_http_connection(self)
Closes the httpx synchronous client.
embed(self, config: gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsOrchestrationConfig, input: gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsInput, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration_v2.models.embeddings.EmbeddingsPostResponse
Executes an embeddings request synchronously.
 
:param config: the embeddings orchestration configuration
:type config: EmbeddingsOrchestrationConfig
:param input: the input text to embed
:type input: EmbeddingsInput
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: the EmbeddingsPostResponse object
:rtype: EmbeddingsPostResponse
handle_retry(self, retry_count: int, base_delay: float, error: gen_ai_hub.orchestration_v2.exceptions.OrchestrationError, max_retries: int) -> float
Handles retry logic with exponential backoff and jitter.
If Retry-After header exists, use it as min_delay to add jitter on top
 
:param retry_count: the incremented retry attempt number
:type retry_count: int
:param base_delay: the initial delay between retries in seconds
:type base_delay: float
:param error: the exception that occurred
:type error: OrchestrationError
:param max_retries: the maximum number of retry attempts
:type max_retries: int
:raises error: throws the original error if no retry should be attempted
:return: the number of seconds to wait before next retry
:rtype: float
run(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> gen_ai_hub.orchestration_v2.models.response.CompletionPostResponse
Executes an orchestration request synchronously (non-streaming).
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
    if not provided, the default configuration is used.
:type config_ref: Optional[OrchestrationConfigReference], optional
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: the CompletionPostResponse object
:rtype: CompletionPostResponse
run_with_retries(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None, max_retries: int = 10, base_delay: float = 1.0) -> gen_ai_hub.orchestration_v2.models.response.OrchestrationResponseWithRetries | None
Executes an orchestration request with automatic retry on rate limits (429) and server errors.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:param max_retries: the maximum number of retry attempts, defaults to 10
:type max_retries: int, optional
:param base_delay: the initial delay between retries in seconds, defaults to 1.0
:type base_delay: float, optional
:return: the OrchestrationResponseWithRetries with retry count information
:rtype: OrchestrationResponseWithRetries | None
:raises ValueError: if no configuration is provided.
:raises OrchestrationError: if request fails after all retries (includes retry count).
stream(self, config: Optional[gen_ai_hub.orchestration_v2.models.config.OrchestrationConfig] = None, config_ref: Union[gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByIdConfigRef, gen_ai_hub.orchestration_v2.models.config.CompletionRequestConfigurationReferenceByNameScenarioVersionConfigRef, NoneType] = None, placeholder_values: Optional[dict] = None, history: Optional[List[Union[gen_ai_hub.orchestration_v2.models.message.SystemMessage, gen_ai_hub.orchestration_v2.models.message.UserMessage, gen_ai_hub.orchestration_v2.models.message.AssistantMessage, gen_ai_hub.orchestration_v2.models.message.ToolChatMessage, gen_ai_hub.orchestration_v2.models.message.DeveloperChatMessage, gen_ai_hub.orchestration_v2.models.message.ResponseChatMessage]]] = None, timeout: Union[int, float, openai.Timeout, NoneType] = None) -> Iterable[gen_ai_hub.orchestration_v2.models.response.StreamCompletionPostResponse]
Executes an orchestration streaming request synchronously.
 
:param config: the orchestration configuration, defaults to None
:type config: Optional[OrchestrationConfig], optional
:param config_ref: the orchestration configuration reference, defaults to None
:type config_ref: Optional[OrchestrationConfigReference], optional
    if not provided, the default configuration is used.
:param placeholder_values: the template values, defaults to None
:type placeholder_values: Optional[dict], optional
:param history: the message history, defaults to None
:type history: Optional[List[ChatMessage]], optional
:param timeout: the timeout overwrite per request, defaults to None
:type timeout: Union[int, float, httpx.Timeout, None], optional
:return: An Iterable[StreamCompletionPostResponse] object
:rtype: Iterable[StreamCompletionPostResponse]

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
Functions
       
cache_if_not_none(func)
Custom cache decorator that only caches non-None results
discover_orchestration_api_url(base_url: str, auth_url: str, client_id: str, client_secret: str, resource_group: str, config_id: Optional[str] = None, config_name: Optional[str] = None, orchestration_scenario: str = 'orchestration', executable_id: str = 'orchestration') -> Optional[str]
Discovers the orchestration API URL based on provided configuration details.
 
:param base_url: the base URL for the AI Core API.
:type base_url: str
:param auth_url: the URL for the AI Core authentication service.
:type auth_url: str
:param client_id: the client ID for the AI Core API.
:type client_id: str
:param client_secret: the client secret for the AI Core API.
:type client_secret: str
:param resource_group: the resource group for the AI Core API.
:type resource_group: str
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param orchestration_scenario: the orchestration scenario ID, defaults to "orchestration"
:type orchestration_scenario: str, optional
:param executable_id: the orchestration executable ID, defaults to "orchestration"
:type executable_id: str, optional
:return: the orchestration API URL or None if no deployment is found.
:rtype: Optional[str]
get_orchestration_api_url(proxy_client: gen_ai_hub.proxy.gen_ai_hub_proxy.client.GenAIHubProxyClient, deployment_id: Optional[str] = None, config_name: Optional[str] = None, config_id: Optional[str] = None) -> str
Retrieves the orchestration API URL based on provided deployment or configuration details.
 
:param proxy_client: the GenAIHubProxyClient instance.
:type proxy_client: GenAIHubProxyClient
:param deployment_id: the deployment ID, defaults to None
:type deployment_id: Optional[str], optional
:param config_name: the configuration name, defaults to None
:type config_name: Optional[str], optional
:param config_id: the configuration ID, defaults to None
:type config_id: Optional[str], optional
:raises ValueError: throws if no orchestration deployment is found.
:return: the orchestration API URL.
:rtype: str

 
Data
        CONFIG_AND_CONFIG_REF_ERROR_TEXT = 'Cannot provide both a configuration and a configuration reference.'
ChatMessage = typing.Union[gen_ai_hub.orchestration_v2.models....hestration_v2.models.message.ResponseChatMessage]
Iterable = typing.Iterable
List = typing.List
Optional = typing.Optional
OrchestrationConfigReference = gen_ai_hub.orchestration_v2.models.config.Comple...figurationReferenceByNameScenarioVersionConfigRef
Union = typing.Union
V2_COMPLETION_SUFFIX = '/v2/completion'
V2_EMBEDDINGS_SUFFIX = '/v2/embeddings'