Spaces:
Paused
Paused
| # | |
| # SPDX-FileCopyrightText: Hadad <[email protected]> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import json # Import json module to parse JSON formatted strings from server response lines | |
| import aiohttp # Import aiohttp library to perform asynchronous HTTP requests and handle streaming responses | |
| # Import helper functions to add opening and closing reasoning tags around reasoning text | |
| from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags | |
| # Define an asynchronous function to send a POST request and stream the response from the server | |
| async def aiohttp_transport(host: str, headers: dict, payload: dict, mode: str): | |
| """ | |
| This asynchronous function establishes a streaming HTTP POST connection to the specified server endpoint | |
| using the aiohttp library. It sends a JSON payload containing the request parameters and headers, and | |
| processes the server's streamed response line by line in real time. | |
| The function is designed to handle responses that include two types of data chunks: reasoning text and | |
| content text. Reasoning text represents intermediate thought processes or explanations generated by the AI, | |
| while content text represents the final output or answer. | |
| The function maintains several internal state variables to manage the streaming process: | |
| - 'reasoning' accumulates the reasoning text segments as they arrive incrementally from the server. | |
| - 'reasoning_tag' is a boolean flag that ensures the opening reasoning tag (<think>) is inserted only once. | |
| - 'reasoning_check' is used to detect if the reasoning field is present in the initial streamed data chunk, | |
| which determines whether reasoning processing should occur. | |
| - 'reasoning_done' indicates when the reasoning phase has completed and the function should switch to | |
| accumulating content text. | |
| - 'content' accumulates the main content text after reasoning finishes. | |
| The function reads the response stream asynchronously, decoding each line from bytes to UTF-8 strings, | |
| and filters out any lines that do not start with the expected "data:" prefix. For valid data lines, it | |
| parses the JSON payload to extract incremental updates contained within the 'delta' field of the first | |
| choice in the response. | |
| Upon detecting reasoning text in the delta, and if the current mode allows reasoning output (i.e., mode is | |
| not "/no_think"), the function inserts an opening <think> tag once and appends subsequent reasoning chunks, | |
| carefully removing any duplicate tags to maintain clean formatting. It yields these reasoning segments | |
| progressively to the caller, enabling real-time display of the AI's intermediate thoughts. | |
| When the response transitions from reasoning to content (indicated by the presence of 'content' in the delta), | |
| the function closes the reasoning block with a closing </think> tag if it was opened, yields the final reasoning | |
| block, and then begins accumulating and yielding content chunks. An empty string is yielded as a separator | |
| between reasoning and content for clarity. | |
| If reasoning is absent, completed, or disabled by mode, the function directly accumulates and yields content | |
| chunks as they arrive. | |
| The function includes robust error handling to gracefully skip over any malformed JSON chunks or transient | |
| connection issues without interrupting the streaming process. This ensures continuous and reliable streaming | |
| of AI responses even in the face of occasional data irregularities. | |
| Overall, this function provides a comprehensive and efficient mechanism to stream, parse, and yield AI-generated | |
| reasoning and content in real time, supporting interactive and dynamic user experiences. | |
| """ | |
| # Initialize an empty string to accumulate streamed reasoning text segments from the response | |
| reasoning = "" # This will hold the reasoning text as it is received incrementally | |
| # Boolean flag to track if the opening <think> tag has been inserted to avoid duplicates | |
| reasoning_tag = False # Ensures the reasoning opening tag is added only once | |
| # Variable to check presence of reasoning field in the first chunk of streamed data | |
| reasoning_check = None # Used to determine if reasoning should be processed for this response | |
| # Flag to indicate that reasoning section has finished and content streaming should start | |
| reasoning_done = False # Marks when reasoning is complete and content output begins | |
| # Initialize an empty string to accumulate the main content text from the response | |
| content = "" # Will hold the actual content output after reasoning is finished | |
| # Create an aiohttp client session with no timeout to allow indefinite streaming | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: | |
| # Send a POST request to the given host with specified headers and JSON payload | |
| async with session.post(host, headers=headers, json=payload) as resp: | |
| resp.raise_for_status() # Raise an exception if HTTP response status is not successful (2xx) | |
| # Iterate asynchronously over each line of bytes in the streamed response content | |
| async for line_bytes in resp.content: | |
| line = line_bytes.decode("utf-8").strip() # Decode bytes to UTF-8 string and strip whitespace | |
| # Skip processing for lines that do not start with the expected "data:" prefix | |
| if not line.startswith("data:"): | |
| continue # Ignore lines without data prefix and continue to next streamed line | |
| try: | |
| # Parse the JSON object from the line after removing the "data:" prefix | |
| data = json.loads(line[5:]) # Convert JSON string to Python dictionary | |
| # Extract the 'delta' dictionary which contains incremental update fields | |
| choice = data["choices"][0]["delta"] # Access the partial update from the streamed response | |
| # Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty | |
| if reasoning_check is None: # Only check once on the initial chunk received | |
| # Set reasoning_check to empty string if reasoning key exists and has content, else None | |
| reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
| # If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists | |
| if ( | |
| reasoning_check == "" # Reasoning field detected in first chunk | |
| and mode != "/no_think" # Mode does not disable reasoning output | |
| and not reasoning_done # Reasoning section is still in progress | |
| and "reasoning" in choice # Current chunk contains reasoning text | |
| and choice["reasoning"] # Reasoning text is non-empty | |
| ): | |
| # Insert opening reasoning tag once and append the first reasoning chunk | |
| if not reasoning_tag: # Only add opening tag once at the start of reasoning | |
| reasoning_tag = True # Mark that opening tag has been inserted | |
| reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string | |
| reasoning += choice["reasoning"] # Append initial reasoning text chunk | |
| else: | |
| # Remove any duplicate opening tags and append subsequent reasoning chunks | |
| reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags | |
| reasoning += reasoning_content # Append next reasoning segment to accumulated text | |
| yield reasoning # Yield the intermediate reasoning text chunk to the caller | |
| continue # Continue to next streamed line without further processing | |
| # If reasoning is done and content starts arriving, finalize reasoning output | |
| if ( | |
| reasoning_check == "" # Reasoning was detected initially | |
| and mode != "/no_think" # Mode allows reasoning output | |
| and not reasoning_done # Reasoning not yet marked as done | |
| and "content" in choice # Current chunk contains content field | |
| and choice["content"] # Content text is non-empty | |
| ): | |
| reasoning_done = True # Mark reasoning section as complete | |
| # If reasoning tag was opened, close it properly before yielding final reasoning block | |
| if reasoning_tag: # Only close tag if it was previously opened | |
| reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag | |
| yield reasoning # Yield the complete reasoning text block | |
| yield "" # Yield an empty string as a separator between reasoning and content | |
| content += choice["content"] # Start accumulating content text from this chunk | |
| yield content # Yield the first chunk of content text to the caller | |
| continue # Proceed to next line in the stream | |
| # Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present | |
| if ( | |
| (reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode | |
| and "content" in choice # Current chunk contains content field | |
| and choice["content"] # Content text is non-empty | |
| ): | |
| content += choice["content"] # Append the content chunk to accumulated content string | |
| yield content # Yield the updated content string so far | |
| # Catch any exceptions from JSON parsing errors or connection issues to prevent stream break | |
| except Exception: | |
| continue # Ignore malformed chunks or transient errors and continue processing next lines |