gen_ai_hub.orchestration.sse_client
index
/home/jenkins/agent/workspace/ation_generative-ai-hub-sdk_main/gen_ai_hub/orchestration/sse_client.py

Module for Server-Sent Events (SSE) clients for orchestration responses.
 
This module provides both synchronous and asynchronous SSE clients for iterating over streaming responses.
Each client is responsible for handling HTTP errors and for closing the underlying HTTP stream
when iteration is complete.

 
Modules
       
dacite
httpx
json

 
Classes
       
builtins.object
AsyncSSEClient
SSEClient

 
class AsyncSSEClient(builtins.object)
    AsyncSSEClient(response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
 
An asynchronous SSE client for iterating over streaming responses.
 
This client wraps an asynchronous HTTP stream (provided as a context manager) and ensures
that the stream is properly opened and closed. It also checks for HTTP errors upon entering the stream.
 
  Methods defined here:
async __aenter__(self)
Asynchronously enters the context for the streaming response.
 
It awaits the response, checks for HTTP errors, and if an error occurs,
reads the content and raises an OrchestrationError.
 
return: Self, with the streaming response stored.
rtype: AsyncSSEClient
async __aexit__(self, exc_type, exc_val, exc_tb)
Asynchronously exits the context, ensuring that the context manager is properly closed.
__aiter__(self)
Returns the async iterator (self). The initialization of the stream is deferred until the first
call to __anext__.
async __anext__(self)
Asynchronously retrieves the next event from the stream. On the first call, it enters the asynchronous
context to start the stream. When the stream is exhausted or the final message is received, it properly
exits the context.
 
return: The next parsed event from the stream.
rtype: OrchestrationResponseStreaming
raises StopAsyncIteration: When the stream is exhausted.
__init__(self, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
Initializes the AsyncSSEClient.
 
:param response_cm: An asynchronous context manager for the HTTP streaming response.
:type response_cm: the type of an async context manager returning httpx.Response
:param prefix: The SSE data prefix, defaults to "data: "
:type prefix: str, optional
:param final_message: The message indicating the end of the stream, defaults to "[DONE]"
:type final_message: str, optional

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class SSEClient(builtins.object)
    SSEClient(response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
 
A synchronous Server-Sent Events (SSE) client that wraps an httpx.Response for iterating
over streaming responses.
 
This client reads data chunks from the HTTP stream and parses each SSE event.
For performance reasons the underlying HTTP stream is reused for subsequent calls.
 
  Methods defined here:
__enter__(self)
Synchronously enters the context for the streaming response.
 
It awaits the response, checks for HTTP errors, and if an error occurs,
reads the content and raises an OrchestrationError.
 
return: Self, with the streaming response stored.
rtype: SSEClient
__exit__(self, exc_type, exc_val, exc_tb)
Synchronously exits the context, ensuring that the context manager is properly closed.
__init__(self, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
Initializes the SSEClient.
 
:param response_cm: An httpx.Response context manager for the streaming response.
:type response_cm: httpx.Response
:param prefix: The prefix string that identifies SSE event data, defaults to "data: "
:type prefix: str, optional
:param final_message: The message that indicates the end of the stream, defaults to "[DONE]"
:type final_message: str, optional
__iter__(self) -> Iterator
Returns self as an iterator. Opens the HTTP stream and initializes the internal iterator.
__next__(self)
Retrieves the next parsed SSE event from the stream.
It skips any lines that do not start with the expected prefix. When the final message is encountered
or the stream is exhausted, it closes the stream and raises StopIteration.
iter_lines(self) -> Iterable[str]
Reads data chunks from the HTTP stream and yields complete lines.
 
This method accumulates incoming chunks until a newline is encountered, yielding one complete
line at a time.
 
yield: Complete lines of text from the streaming response.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
Data
        AsyncIterator = typing.AsyncIterator
Iterable = typing.Iterable
Iterator = typing.Iterator