Spaces:
Paused
Paused
| # | |
| # SPDX-FileCopyrightText: Hadad <[email protected]> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import json # Import json module to decode JSON formatted strings from server responses | |
| import httpx # Import httpx library to perform asynchronous HTTP requests with HTTP/2 support | |
| # Import functions to add opening and closing tags around reasoning text for proper formatting | |
| from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags | |
| # Define asynchronous function to send a POST request and stream the response from the server | |
| async def httpx_transport(host: str, headers: dict, payload: dict, mode: str): | |
| """ | |
| This asynchronous function establishes a streaming POST request to the specified server endpoint using the httpx library with HTTP/2 support. | |
| It is designed to handle incremental server responses that include both reasoning and content parts, which are streamed as separate chunks. | |
| The function processes each line of the streamed response, parsing JSON data prefixed by "data:", and yields partial outputs to the caller in real-time. | |
| The function maintains internal state to manage the reasoning text separately from the main content. It detects whether the response includes a reasoning section | |
| by inspecting the first chunk containing the 'reasoning' field. If reasoning is present and the mode allows it (i.e., mode is not "/no_think"), it wraps the reasoning text | |
| within custom tags (<think> ... </think>) to clearly demarcate this part of the output. The opening tag is inserted once at the start of reasoning, and subsequent chunks | |
| append reasoning text after cleansing redundant tags. | |
| Once the reasoning section is complete and the content part begins, the function closes the reasoning tags properly before yielding the final reasoning block. It then yields | |
| an empty string as a separator, followed by the streamed content chunks. If reasoning is absent or disabled, the function directly accumulates and yields content chunks. | |
| The function is robust against malformed data or transient connection issues, gracefully skipping any problematic chunks without interrupting the stream. It reads each line | |
| as a UTF-8 decoded string, strips whitespace, and only processes lines starting with the "data:" prefix to ensure valid data handling. | |
| Parameters: | |
| - host (str): The URL of the server endpoint to which the POST request is sent. | |
| - headers (dict): HTTP headers to include in the request, such as authorization and content type. | |
| - payload (dict): The JSON payload containing the request data, including model, messages, and generation parameters. | |
| - mode (str): A string controlling behavior such as enabling or disabling reasoning output (e.g., "/no_think" disables reasoning). | |
| Yields: | |
| - str: Partial chunks of reasoning or content as they are received from the server, allowing real-time streaming output. | |
| Workflow: | |
| 1. Initializes empty strings and flags to track reasoning text, content text, and reasoning state. | |
| 2. Opens an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming. | |
| 3. Sends a POST request to the specified host with provided headers and JSON payload, initiating a streaming response. | |
| 4. Iterates asynchronously over each line of the streamed response. | |
| - Skips any lines that do not start with the "data:" prefix to filter valid data chunks. | |
| - Parses the JSON content after the "data:" prefix into a Python dictionary. | |
| - Extracts the 'delta' field from the first choice in the response, which contains incremental updates. | |
| 5. On the first chunk, checks if the 'reasoning' field is present and non-empty to determine if reasoning should be processed. | |
| 6. If reasoning is present and allowed by mode, and reasoning is not yet complete: | |
| - Inserts the opening <think> tag once. | |
| - Appends reasoning text chunks, removing redundant opening tags if necessary. | |
| - Yields the accumulated reasoning text for real-time consumption. | |
| 7. When reasoning ends and content begins: | |
| - Marks reasoning as done. | |
| - Closes the reasoning tag properly if it was opened. | |
| - Yields the finalized reasoning block. | |
| - Yields an empty string as a separator. | |
| - Starts accumulating content text and yields the first content chunk. | |
| 8. If reasoning is absent, finished, or disabled, accumulates and yields content chunks directly. | |
| 9. Handles any exceptions during parsing or connection by skipping malformed chunks, ensuring the stream continues uninterrupted. | |
| This design allows clients to receive partial reasoning and content outputs as they are generated by the server, enabling interactive and responsive user experiences. | |
| """ | |
| # Initialize an empty string to accumulate streamed reasoning text from the response | |
| reasoning = "" # Holds reasoning text segments as they are received incrementally | |
| # Boolean flag to track whether the opening <think> tag has been inserted to avoid duplicates | |
| reasoning_tag = False # Ensures the reasoning opening tag is added only once during streaming | |
| # Variable to check presence of reasoning field in the first chunk of streamed data | |
| reasoning_check = None # Used to determine if reasoning should be processed for this response | |
| # Flag to indicate that reasoning section has finished and content streaming should start | |
| reasoning_done = False # Marks when reasoning is complete and content output begins | |
| # Initialize an empty string to accumulate the main content text from the response | |
| content = "" # Will hold the actual content output after reasoning is finished | |
| # Create an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming | |
| async with httpx.AsyncClient(timeout=None, http2=True) as client: # Establish persistent HTTP/2 connection | |
| # Send a POST request to the given host with specified headers and JSON payload, and start streaming response | |
| async with client.stream("POST", host, headers=headers, json=payload) as response: # Initiate streaming POST request | |
| # Iterate asynchronously over each line of text in the streamed response content | |
| async for chunk in response.aiter_lines(): # Read response line by line as it arrives from the server | |
| # Skip processing for lines that do not start with the expected "data:" prefix | |
| if not chunk.strip().startswith("data:"): # Only process lines that contain data payloads | |
| continue # Ignore non-data lines and continue to next streamed line | |
| try: | |
| # Parse the JSON object from the line after removing the "data:" prefix | |
| data = json.loads(chunk[5:]) # Convert JSON string to Python dictionary | |
| # Extract the 'delta' dictionary which contains incremental update fields | |
| choice = data["choices"][0]["delta"] # Access the partial update from the streamed response | |
| # Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty | |
| if reasoning_check is None: # Only check once on the initial chunk received | |
| # Set reasoning_check to empty string if reasoning key exists and has content, else None | |
| reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
| # If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists | |
| if ( | |
| reasoning_check == "" # Reasoning field detected in first chunk | |
| and mode != "/no_think" # Mode does not disable reasoning output | |
| and not reasoning_done # Reasoning section is still in progress | |
| and "reasoning" in choice # Current chunk contains reasoning text | |
| and choice["reasoning"] # Reasoning text is non-empty | |
| ): | |
| # Insert opening reasoning tag once and append the first reasoning chunk | |
| if not reasoning_tag: # Only add opening tag once at the start of reasoning | |
| reasoning_tag = True # Mark that opening tag has been inserted | |
| reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string | |
| reasoning += choice["reasoning"] # Append initial reasoning text chunk | |
| else: | |
| # Remove any duplicate opening tags and append subsequent reasoning chunks | |
| reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags | |
| reasoning += reasoning_content # Append next reasoning segment to accumulated text | |
| yield reasoning # Yield the intermediate reasoning text chunk to the caller | |
| continue # Continue to next streamed line without further processing | |
| # If reasoning is done and content starts arriving, finalize reasoning output | |
| if ( | |
| reasoning_check == "" # Reasoning was detected initially | |
| and mode != "/no_think" # Mode allows reasoning output | |
| and not reasoning_done # Reasoning not yet marked as done | |
| and "content" in choice # Current chunk contains content field | |
| and choice["content"] # Content text is non-empty | |
| ): | |
| reasoning_done = True # Mark reasoning section as complete | |
| # If reasoning tag was opened, close it properly before yielding final reasoning block | |
| if reasoning_tag: # Only close tag if it was previously opened | |
| reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag | |
| yield reasoning # Yield the complete reasoning text block | |
| yield "" # Yield an empty string as a separator between reasoning and content | |
| content += choice["content"] # Start accumulating content text from this chunk | |
| yield content # Yield the first chunk of content text to the caller | |
| continue # Proceed to next line in the stream | |
| # Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present | |
| if ( | |
| (reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode | |
| and "content" in choice # Current chunk contains content field | |
| and choice["content"] # Content text is non-empty | |
| ): | |
| content += choice["content"] # Append the content chunk to accumulated content string | |
| yield content # Yield the updated content string so far | |
| # Catch any exceptions from JSON parsing errors or connection issues to prevent stream break | |
| except Exception: # Gracefully handle any error encountered during streaming or parsing | |
| continue # Ignore malformed chunks or transient errors and continue processing next lines |