from agno.agent import RunEvent from agno.models.google import Gemini from agno.db.sqlite import SqliteDb from src.infra.logger import get_logger from src.agent.base import creat_agent, creat_team, get_context, UserState, Location logger = get_logger(__name__) class AgentManager: def __init__(self, user_state: UserState, models_dict: dict, tool_hooks_dict: dict, hooks_dict: dict): self.base_kwargs = { "additional_context": get_context(user_state), "timezone_identifier": user_state.utc_offset, "add_datetime_to_context": True, } self._planer_db = SqliteDb(db_file="tmp/agents.db") self._planer_session_state = {"task_list": None} self.planner_agent = creat_agent(name="planner", model=models_dict["planner"], add_session_state_to_context=True, db=self._planer_db, markdown=True, **self.base_kwargs) self._creat_agent_team( models=models_dict, tools=tool_hooks_dict, hooks=hooks_dict, ) def _creat_agent_team(self, models: dict[str: object], tools: dict[str: object], hooks: dict[str: object]): self._team_db = SqliteDb(db_file="tmp/team.db") self._team_session_state = { "scout_pois": {}, "optimized_route": {}, "traffic_data": [], "weather_forecasts": {}, "task_list": None, } agnent_members = ["scout", "optimizer", "traffic", "weather"] for name in agnent_members: _agent = creat_agent( name=name, model=models[name], tools=tools.get(name, []), tool_hooks=hooks.get(name, []), markdown=False, **self.base_kwargs ) setattr(self, f"{name}_agent", _agent) self.core_team = creat_team( name="team", model=models["team"], members=[getattr(self, f"{name}_agent") for name in agnent_members], tools=tools.get("team", []), tool_hooks=hooks.get("team", []), db=self._team_db, markdown=True, **self.base_kwargs ) @staticmethod def _planner_stream_handle(stream_item): show = True response = "" for chuck in stream_item: if chuck.event == RunEvent.run_content: content = chuck.content response += chuck.content if show: if "@@@" in response: show = False content = content.split("@@@")[0] print(content) json_data = "{" + response.split("{", maxsplit=1)[-1] return json_data, response def planner_message(self, message): planner_stream = self.planner_agent.run(f"help user to update the task_list, user's message: {message}", stream=True, stream_events=True, session_state=self._planer_session_state) self._planer_session_state["task_list"], _response = self._planner_stream_handle(planner_stream) @property def task_list(self): return self._planer_session_state["task_list"] @staticmethod def _core_team_stream_handle(stream_item): for event in stream_item: if event.event == "TeamRunContent": print(f"{event.content}", end="", flush=True) elif event.event == "TeamToolCallStarted": if event.tool.tool_name == "delegate_task_to_member": print(event.tool) # print(f"Supervisor began assigning tasks to member - {event.tool.tool_args['member_id']}...") else: print(f"Supervisor started using the tools: {event.tool.tool_name}") elif event.event == "TeamToolCallCompleted": if event.tool.tool_name == "delegate_task_to_member": print(f"{event.tool.tool_args['member_id']} has completed the task assigned by the supervisor...") # print(event.tool) else: print(f"Supervisor stop using tools:: {event.tool.tool_name}") elif event.event == "ToolCallStarted": print(f"{event.agent_id} Start using tools: {event.tool.tool_name}") elif event.event == "ToolCallCompleted": print(f"{event.agent_id} Stop using tools: {event.tool.tool_name}") elif event.event == "TeamReasoningStep": print(f"Supervisor is reasoning: {event.content}") def core_team_start(self): if not self.task_list: raise ValueError("Task list is empty, cannot start core team.") message = f""" Based on this structured task list, please coordinate with the team members to: 1. Use scout to find specific locations for each task 2. Use optimizer to optimize the route 3. Use weather to check conditions for tomorrow 4. Use traffic to calculate routes and travel times 5. Provide a comprehensive plan and route plan with all details "Once ALL scout tasks complete, IMMEDIATELY proceed to Step 3" "DO NOT wait for user input, the user info is already provided in the context" "You MUST delegate to Optimizer automatically" Here is the task list: {self.task_list} """ self._team_session_state["task_list"] = self.task_list team_stream = self.core_team.run(message, stream=True, stream_events=True, session_state=self._team_session_state,) self._core_team_stream_handle(team_stream) team_state = self.core_team.get_session_state() if __name__ == "__main__": from src.toolkits.googlemap_toolkit import GoogleMapsToolkit from src.toolkits.openweather_toolkit import OpenWeatherToolkit from src.toolkits.optimization_toolkit import OptimizationToolkit from src.infra.config import get_settings setting = get_settings() maps_kit = GoogleMapsToolkit(api_key=setting.google_maps_api_key) weather_kit = OpenWeatherToolkit(api_key=setting.openweather_api_key) opt_kit = OptimizationToolkit(api_key=setting.google_maps_api_key) state = UserState(location=Location(lat=25.058903, lng=121.549131)) settings = get_settings() model = Gemini( id="gemini-2.5-flash-lite", thinking_budget=512, api_key=settings.gemini_api_key) main_model = Gemini( id="gemini-2.5-flash", thinking_budget=1024, api_key=settings.gemini_api_key) models = {"planner": main_model, "team": main_model, "scout": model, "optimizer": model, "traffic": model, "weather": model} tools_dict = { "scout": [maps_kit.search_places], "optimizer": [opt_kit.solve_route_optimization], "traffic": [maps_kit.compute_routes], # 或 get_directions "weather": [weather_kit], "team": [] } am = AgentManager(user_state=state, models_dict=models, tool_hooks_dict=tools_dict, hooks_dict={}) am.planner_message("I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary.") print(f"\n[Planner Phase Complete]") print(f"Task List: {am.task_list}") #if am.task_list: # print("\n[Core Team Start]") # am.core_team_start()